Merge branch 'feature/merge/mgx_intent' into 'mgx_ops'

feat: 合并所所有依赖的pr

See merge request pub/MetaGPT!11
This commit is contained in:
林义章 2024-04-03 12:43:03 +00:00
commit 928747dfd0
36 changed files with 1646 additions and 79 deletions

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import fire
from metagpt.roles.di.data_interpreter import DataInterpreter
async def main():
prompt = """
This is a software requirement:
```text
write a snake game
```
---
1. Writes a PRD based on software requirements.
2. Writes a design to the project repository, based on the PRD of the project.
3. Writes a project plan to the project repository, based on the design of the project.
4. Writes codes to the project repository, based on the project plan of the project.
5. Run QA test on the project repository.
6. Stage and commit changes for the project repository using Git.
Note: All required dependencies and environments have been fully installed and configured.
"""
di = DataInterpreter(
tools=[
"write_prd",
"write_design",
"write_project_plan",
"write_codes",
"run_qa_test",
"fix_bug",
"git_archive",
]
)
await di.run(prompt)
if __name__ == "__main__":
fire.Fire(main)

3
examples/mgx/__init__.py Normal file
View file

@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :

19
examples/mgx/run_mgx.py Normal file
View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import asyncio
from metagpt.roles.di.mgx import MGX
requirement = (
"design a game using Gym (an open source Python library), including a graphical interface and interactive gameplay"
)
async def main(requirement: str = ""):
mgx = MGX(use_intent=True)
await mgx.run(requirement)
if __name__ == "__main__":
asyncio.run(main(requirement))

View file

@ -104,3 +104,8 @@ class Action(SerializationMixin, ContextMixin, BaseModel):
if self.node:
return await self._run_action_node(*args, **kwargs)
raise NotImplementedError("The run method should be implemented in a subclass.")
def override_context(self):
"""Set `private_context` and `context` to the same `Context` object."""
if not self.private_context:
self.private_context = self.context

View file

@ -0,0 +1,123 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Module Description: This script defines the LearnReadMe class, which is an action to learn from the contents of
a README.md file.
Author: mashenquan
Date: 2024-3-20
"""
from pathlib import Path
from typing import Optional
from pydantic import Field
from metagpt.actions import Action
from metagpt.const import GRAPH_REPO_FILE_REPO
from metagpt.schema import Message
from metagpt.utils.common import aread
from metagpt.utils.di_graph_repository import DiGraphRepository
from metagpt.utils.graph_repository import GraphKeyword, GraphRepository
class ExtractReadMe(Action):
"""
An action to extract summary, installation, configuration, usages from the contents of a README.md file.
Attributes:
graph_db (Optional[GraphRepository]): A graph database repository.
install_to_path (Optional[str]): The path where the repository to install to.
"""
graph_db: Optional[GraphRepository] = None
install_to_path: Optional[str] = Field(default="/TO/PATH")
_readme: Optional[str] = None
_filename: Optional[str] = None
async def run(self, with_messages=None, **kwargs):
"""
Implementation of `Action`'s `run` method.
Args:
with_messages (Optional[Type]): An optional argument specifying messages to react to.
"""
graph_repo_pathname = self.context.git_repo.workdir / GRAPH_REPO_FILE_REPO / self.context.git_repo.workdir.name
self.graph_db = await DiGraphRepository.load_from(str(graph_repo_pathname.with_suffix(".json")))
summary = await self._summarize()
await self.graph_db.insert(subject=self._filename, predicate=GraphKeyword.HAS_SUMMARY, object_=summary)
install = await self._extract_install()
await self.graph_db.insert(subject=self._filename, predicate=GraphKeyword.HAS_INSTALL, object_=install)
conf = await self._extract_configuration()
await self.graph_db.insert(subject=self._filename, predicate=GraphKeyword.HAS_CONFIG, object_=conf)
usage = await self._extract_usage()
await self.graph_db.insert(subject=self._filename, predicate=GraphKeyword.HAS_USAGE, object_=usage)
await self.graph_db.save()
return Message(content="", cause_by=self)
async def _summarize(self) -> str:
readme = await self._get()
summary = await self.llm.aask(
readme,
system_msgs=[
"You are a tool can summarize git repository README.md file.",
"Return the summary about what is the repository.",
],
stream=False,
)
return summary
async def _extract_install(self) -> str:
await self._get()
install = await self.llm.aask(
self._readme,
system_msgs=[
"You are a tool can install git repository according to README.md file.",
"Return a bash code block of markdown including:\n"
f"1. git clone the repository to the directory `{self.install_to_path}`;\n"
f"2. cd `{self.install_to_path}`;\n"
f"3. install the repository.",
],
stream=False,
)
return install
async def _extract_configuration(self) -> str:
await self._get()
configuration = await self.llm.aask(
self._readme,
system_msgs=[
"You are a tool can configure git repository according to README.md file.",
"Return a bash code block of markdown object to configure the repository if necessary, otherwise return"
" a empty bash code block of markdown object",
],
stream=False,
)
return configuration
async def _extract_usage(self) -> str:
await self._get()
usage = await self.llm.aask(
self._readme,
system_msgs=[
"You are a tool can summarize all usages of git repository according to README.md file.",
"Return a list of code block of markdown objects to demonstrates the usage of the repository.",
],
stream=False,
)
return usage
async def _get(self) -> str:
if self._readme is not None:
return self._readme
root = Path(self.i_context).resolve()
filename = None
for file_path in root.iterdir():
if file_path.is_file() and file_path.stem == "README":
filename = file_path
break
if not filename:
return ""
self._readme = await aread(filename=filename, encoding="utf-8")
self._filename = str(filename)
return self._readme

View file

@ -0,0 +1,226 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This script defines an action to import a Git repository into the MetaGPT project format, enabling incremental
appending of requirements.
The MetaGPT project format encompasses a structured representation of project data compatible with MetaGPT's
capabilities, facilitating the integration of Git repositories into MetaGPT workflows while allowing for the gradual
addition of requirements.
"""
import json
import re
from pathlib import Path
from typing import List, Optional
from pydantic import BaseModel
from metagpt.actions import Action
from metagpt.actions.extract_readme import ExtractReadMe
from metagpt.actions.rebuild_class_view import RebuildClassView
from metagpt.actions.rebuild_sequence_view import RebuildSequenceView
from metagpt.const import GRAPH_REPO_FILE_REPO
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.tools.libs.git import git_clone
from metagpt.utils.common import (
aread,
awrite,
list_files,
parse_json_code_block,
split_namespace,
)
from metagpt.utils.di_graph_repository import DiGraphRepository
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
from metagpt.utils.graph_repository import GraphKeyword, GraphRepository
from metagpt.utils.project_repo import ProjectRepo
class ImportRepo(Action):
"""
An action to import a Git repository into a graph database and create related artifacts.
Attributes:
repo_path (str): The URL of the Git repository to import.
graph_db (Optional[GraphRepository]): The output graph database of the Git repository.
rid (str): The output requirement ID.
"""
repo_path: str # input, git repo url.
graph_db: Optional[GraphRepository] = None # output. graph db of the git repository
rid: str = "" # output, requirement ID.
async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
"""
Runs the import process for the Git repository.
Args:
with_messages (List[Message], optional): Additional messages to include.
**kwargs: Additional keyword arguments.
Returns:
Message: A message indicating the completion of the import process.
"""
await self._create_repo()
await self._create_prd()
await self._create_system_design()
self.context.git_repo.archive(comments="Import")
async def _create_repo(self):
path = await git_clone(url=self.repo_path, output_dir=self.config.workspace.path)
self.repo_path = str(path)
self.config.project_path = path
self.context.git_repo = GitRepository(local_path=path, auto_init=True)
self.context.repo = ProjectRepo(self.context.git_repo)
self.context.src_workspace = await self._guess_src_workspace()
await awrite(
filename=self.context.repo.workdir / ".src_workspace",
data=str(self.context.src_workspace.relative_to(self.context.repo.workdir)),
)
async def _create_prd(self):
action = ExtractReadMe(i_context=str(self.context.repo.workdir), context=self.context)
await action.run()
graph_repo_pathname = self.context.git_repo.workdir / GRAPH_REPO_FILE_REPO / self.context.git_repo.workdir.name
self.graph_db = await DiGraphRepository.load_from(str(graph_repo_pathname.with_suffix(".json")))
rows = await self.graph_db.select(predicate=GraphKeyword.HAS_SUMMARY)
prd = {"Project Name": self.context.repo.workdir.name}
for r in rows:
if Path(r.subject).stem == "README":
prd["Original Requirements"] = r.object_
break
self.rid = FileRepository.new_filename()
await self.repo.docs.prd.save(filename=self.rid + ".json", content=json.dumps(prd))
async def _create_system_design(self):
action = RebuildClassView(
name="ReverseEngineering", i_context=str(self.context.src_workspace), context=self.context
)
await action.run()
rows = await action.graph_db.select(predicate="hasMermaidClassDiagramFile")
class_view_filename = rows[0].object_
logger.info(f"class view:{class_view_filename}")
rows = await action.graph_db.select(predicate=GraphKeyword.HAS_PAGE_INFO)
tag = "__name__:__main__"
entries = []
src_workspace = self.context.src_workspace.relative_to(self.context.repo.workdir)
for r in rows:
if tag in r.subject:
path = split_namespace(r.subject)[0]
elif tag in r.object_:
path = split_namespace(r.object_)[0]
else:
continue
if Path(path).is_relative_to(src_workspace):
entries.append(Path(path))
main_entry = await self._guess_main_entry(entries)
full_path = RebuildSequenceView.get_full_filename(self.context.repo.workdir, main_entry)
action = RebuildSequenceView(context=self.context, i_context=str(full_path))
try:
await action.run()
except Exception as e:
logger.warning(f"{e}, use the last successful version.")
files = list_files(self.context.repo.resources.data_api_design.workdir)
pattern = re.compile(r"[^a-zA-Z0-9]")
name = re.sub(pattern, "_", str(main_entry))
filename = Path(name).with_suffix(".sequence_diagram.mmd")
postfix = str(filename)
sequence_files = [i for i in files if postfix in str(i)]
content = await aread(filename=sequence_files[0])
await self.context.repo.resources.data_api_design.save(
filename=self.repo.workdir.stem + ".sequence_diagram.mmd", content=content
)
await self._save_system_design()
async def _save_system_design(self):
class_view = await self.context.repo.resources.data_api_design.get(
filename=self.repo.workdir.stem + ".class_diagram.mmd"
)
sequence_view = await self.context.repo.resources.data_api_design.get(
filename=self.repo.workdir.stem + ".sequence_diagram.mmd"
)
file_list = self.context.git_repo.get_files(relative_path=".", root_relative_path=self.context.src_workspace)
data = {
"Data structures and interfaces": class_view.content,
"Program call flow": sequence_view.content,
"File list": [str(i) for i in file_list],
}
await self.context.repo.docs.system_design.save(filename=self.rid + ".json", content=json.dumps(data))
async def _guess_src_workspace(self) -> Path:
files = list_files(self.context.repo.workdir)
dirs = [i.parent for i in files if i.name == "__init__.py"]
distinct = set()
for i in dirs:
done = False
for j in distinct:
if i.is_relative_to(j):
done = True
break
if j.is_relative_to(i):
break
if not done:
distinct = {j for j in distinct if not j.is_relative_to(i)}
distinct.add(i)
if len(distinct) == 1:
return list(distinct)[0]
prompt = "\n".join([f"- {str(i)}" for i in distinct])
rsp = await self.llm.aask(
prompt,
system_msgs=[
"You are a tool to choose the source code path from a list of paths based on the directory name.",
"You should identify the source code path among paths such as unit test path, examples path, etc.",
"Return a markdown JSON object containing:\n"
'- a "src" field containing the source code path;\n'
'- a "reason" field containing explaining why other paths is not the source code path\n',
],
)
logger.debug(rsp)
json_blocks = parse_json_code_block(rsp)
class Data(BaseModel):
src: str
reason: str
data = Data.model_validate_json(json_blocks[0])
logger.info(f"src_workspace: {data.src}")
return Path(data.src)
async def _guess_main_entry(self, entries: List[Path]) -> Path:
if len(entries) == 1:
return entries[0]
file_list = "## File List\n"
file_list += "\n".join([f"- {i}" for i in entries])
rows = await self.graph_db.select(predicate=GraphKeyword.HAS_USAGE)
usage = "## Usage\n"
for r in rows:
if Path(r.subject).stem == "README":
usage += r.object_
prompt = file_list + "\n---\n" + usage
rsp = await self.llm.aask(
prompt,
system_msgs=[
'You are a tool to choose the source file path from "File List" which is used in "Usage".',
'You choose the source file path based on the name of file and the class name and package name used in "Usage".',
"Return a markdown JSON object containing:\n"
'- a "filename" field containing the chosen source file path from "File List" which is used in "Usage";\n'
'- a "reason" field explaining why.',
],
stream=False,
)
logger.debug(rsp)
json_blocks = parse_json_code_block(rsp)
class Data(BaseModel):
filename: str
reason: str
data = Data.model_validate_json(json_blocks[0])
logger.info(f"main: {data.filename}")
return Path(data.filename)

View file

@ -14,8 +14,6 @@ from typing import Optional
from metagpt.actions import Action, ActionOutput
from metagpt.const import REQUIREMENT_FILENAME
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
from metagpt.utils.project_repo import ProjectRepo
class PrepareDocuments(Action):
@ -38,8 +36,7 @@ class PrepareDocuments(Action):
if path.exists() and not self.config.inc:
shutil.rmtree(path)
self.config.project_path = path
self.context.git_repo = GitRepository(local_path=path, auto_init=True)
self.context.repo = ProjectRepo(self.context.git_repo)
self.context.set_repo_dir(path)
async def run(self, with_messages, **kwargs):
"""Create and initialize the workspace folder, initialize the Git environment."""

View file

@ -244,15 +244,6 @@ class RebuildSequenceView(Action):
class_view = await self._get_uml_class_view(ns_class_name)
source_code = await self._get_source_code(ns_class_name)
# prompt_blocks = [
# "## Instruction\n"
# "You are a python code to UML 2.0 Use Case translator.\n"
# 'The generated UML 2.0 Use Case must include the roles or entities listed in "Participants".\n'
# "The functional descriptions of Actors and Use Cases in the generated UML 2.0 Use Case must not "
# 'conflict with the information in "Mermaid Class Views".\n'
# 'The section under `if __name__ == "__main__":` of "Source Code" contains information about external '
# "system interactions with the internal system.\n"
# ]
prompt_blocks = []
block = "## Participants\n"
for p in participants:
@ -340,6 +331,7 @@ class RebuildSequenceView(Action):
system_msgs=[
"You are a Mermaid Sequence Diagram translator in function detail.",
"Translate the markdown text to a Mermaid Sequence Diagram.",
"Response must be concise.",
"Return a markdown mermaid code block.",
],
stream=False,
@ -440,7 +432,7 @@ class RebuildSequenceView(Action):
rows = await self.graph_db.select(subject=ns_class_name, predicate=GraphKeyword.HAS_PAGE_INFO)
filename = split_namespace(ns_class_name=ns_class_name)[0]
if not rows:
src_filename = RebuildSequenceView._get_full_filename(root=self.i_context, pathname=filename)
src_filename = RebuildSequenceView.get_full_filename(root=self.i_context, pathname=filename)
if not src_filename:
return ""
return await aread(filename=src_filename, encoding="utf-8")
@ -450,7 +442,7 @@ class RebuildSequenceView(Action):
)
@staticmethod
def _get_full_filename(root: str | Path, pathname: str | Path) -> Path | None:
def get_full_filename(root: str | Path, pathname: str | Path) -> Path | None:
"""
Convert package name to the full path of the module.
@ -466,7 +458,7 @@ class RebuildSequenceView(Action):
"metagpt/management/skill_manager.py", then the returned value will be
"/User/xxx/github/MetaGPT/metagpt/management/skill_manager.py"
"""
if re.match(r"^/.+", pathname):
if re.match(r"^/.+", str(pathname)):
return pathname
files = list_files(root=root)
postfix = "/" + str(pathname)

View file

@ -47,7 +47,7 @@ class Config(CLIParams, YamlModel):
# Key Parameters
llm: LLMConfig
# Global Proxy. Will be used if llm.proxy is not set
# Global Proxy. Not used by LLM, but by other tools such as browsers.
proxy: str = ""
# Tool Parameters

View file

@ -5,6 +5,8 @@
@Author : alexanderwu
@File : context.py
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, Optional
@ -78,6 +80,11 @@ class Context(BaseModel):
# env.update({k: v for k, v in i.items() if isinstance(v, str)})
return env
def set_repo_dir(self, path: str | Path):
repo_path = Path(path)
self.git_repo = GitRepository(local_path=repo_path, auto_init=True)
self.repo = ProjectRepo(self.git_repo)
def _select_costmanager(self, llm_config: LLMConfig) -> CostManager:
"""Return a CostManager instance"""
if llm_config.api_type == LLMType.FIREWORKS:

View file

@ -14,7 +14,7 @@ from metagpt.roles import Role
from metagpt.schema import Message, Task, TaskResult
from metagpt.strategy.task_type import TaskType
from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.utils.common import CodeParser
from metagpt.utils.common import CodeParser, role_raise_decorator
REACT_THINK_PROMPT = """
# User Requirement
@ -153,7 +153,7 @@ class DataInterpreter(Role):
logger.info(f"ready to {todo.name}")
use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial
user_requirement = self.get_memories()[0].content
user_requirement = self.get_memories()[0].content # issue: 1多次用户交互时永远只读用户的第1次request2prerequisite没处理
code = await todo.run(
user_requirement=user_requirement,
@ -186,3 +186,11 @@ class DataInterpreter(Role):
print(result)
data_info = DATA_INFO.format(info=result)
self.working_memory.add(Message(content=data_info, role="user", cause_by=CheckData))
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
if not self.rc.todo:
self.set_actions([WriteAnalysisCode])
self._set_state(0)
return await super().run(with_message)

62
metagpt/roles/di/mgx.py Normal file
View file

@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import asyncio
from typing import Dict, List
from metagpt.actions.di.detect_intent import DetectIntent
from metagpt.logs import logger
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.schema import Message
class MGX(DataInterpreter):
use_intent: bool = True
intents: Dict = {}
async def _detect_intent(self, user_msgs: List[Message] = None, **kwargs):
todo = DetectIntent(context=self.context)
request_with_sop, sop_type = await todo.run(user_msgs)
logger.info(f"{sop_type} {request_with_sop}")
return request_with_sop
async def _plan_and_act(self) -> Message:
"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""
# create initial plan and update it until confirmation
goal = self.rc.memory.get()[-1].content # retreive latest user requirement
if self.use_intent: # add mode
user_message = Message(content=goal, role="user")
goal = await self._detect_intent(user_msgs=[user_message])
logger.info(f"Goal is {goal}")
await self.planner.update_plan(goal=goal)
# take on tasks until all finished
while self.planner.current_task:
task = self.planner.current_task
logger.info(f"ready to take on task {task}")
# take on current task
task_result = await self._act_on_task(task)
# process the result, such as reviewing, confirming, plan updating
await self.planner.process_task_result(task_result)
rsp = self.planner.get_useful_memories()[0] # return the completed plan as a response
self.rc.memory.add(rsp) # add to persistent memory
return rsp
if __name__ == "__main__":
test_requirements = [
"design a game using Gym (an open source Python library), including a graphical interface and interactive gameplay"
# "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy"
# "获取https://www.stats.gov.cn/sj/sjjd/202307/t20230718_1941322.html的内容并返回上半年cpi的增长或下降幅度"
]
for requirement in test_requirements:
mgx = MGX()
rsp = asyncio.run(mgx.run(requirement))

View file

@ -22,7 +22,7 @@ from __future__ import annotations
import json
from collections import defaultdict
from pathlib import Path
from typing import Optional, Set
from typing import List, Optional, Set
from metagpt.actions import Action, WriteCode, WriteCodeReview, WriteTasks
from metagpt.actions.fix_bug import FixBug
@ -46,7 +46,13 @@ from metagpt.schema import (
Documents,
Message,
)
from metagpt.utils.common import any_to_name, any_to_str, any_to_str_set
from metagpt.utils.common import (
any_to_name,
any_to_str,
any_to_str_set,
get_project_srcs_path,
init_python_folder,
)
IS_PASS_PROMPT = """
{context}
@ -240,7 +246,7 @@ class Engineer(Role):
async def _think(self) -> Action | None:
if not self.src_workspace:
self.src_workspace = self.git_repo.workdir / self.git_repo.workdir.name
self.src_workspace = get_project_srcs_path(self.project_repo.workdir)
write_plan_and_change_filters = any_to_str_set([WriteTasks, FixBug])
write_code_filters = any_to_str_set([WriteTasks, WriteCodePlanAndChange, SummarizeCode])
summarize_code_filters = any_to_str_set([WriteCode, WriteCodeReview])
@ -261,7 +267,7 @@ class Engineer(Role):
return self.rc.todo
return None
async def _new_coding_context(self, filename, dependency) -> CodingContext:
async def _new_coding_context(self, filename, dependency) -> Optional[CodingContext]:
old_code_doc = await self.project_repo.srcs.get(filename)
if not old_code_doc:
old_code_doc = Document(root_path=str(self.project_repo.src_relative_path), filename=filename, content="")
@ -277,6 +283,8 @@ class Engineer(Role):
elif str(i.parent) == CODE_PLAN_AND_CHANGE_FILE_REPO:
code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(i.name)
if not task_doc or not design_doc:
if filename == "__init__.py": # `__init__.py` created by `init_python_folder`
return None
logger.error(f'Detected source code "{filename}" from an unknown origin.')
raise ValueError(f'Detected source code "{filename}" from an unknown origin.')
context = CodingContext(
@ -288,8 +296,10 @@ class Engineer(Role):
)
return context
async def _new_coding_doc(self, filename, dependency):
async def _new_coding_doc(self, filename, dependency) -> Optional[Document]:
context = await self._new_coding_context(filename, dependency)
if not context:
return None # `__init__.py` created by `init_python_folder`
coding_doc = Document(
root_path=str(self.project_repo.src_relative_path), filename=filename, content=context.model_dump_json()
)
@ -307,6 +317,7 @@ class Engineer(Role):
task_doc = await self.project_repo.docs.task.get(filename)
code_plan_and_change_doc = await self.project_repo.docs.code_plan_and_change.get(filename)
task_list = self._parse_tasks(task_doc)
await self._init_python_folder(task_list)
for task_filename in task_list:
old_code_doc = await self.project_repo.srcs.get(task_filename)
if not old_code_doc:
@ -345,6 +356,8 @@ class Engineer(Role):
if filename in changed_files.docs:
continue
coding_doc = await self._new_coding_doc(filename=filename, dependency=dependency)
if not coding_doc:
continue # `__init__.py` created by `init_python_folder`
changed_files.docs[filename] = coding_doc
self.code_todos.append(WriteCode(i_context=coding_doc, context=self.context, llm=self.llm))
@ -360,6 +373,8 @@ class Engineer(Role):
ctx = CodeSummarizeContext.loads(filenames=list(dependencies))
summarizations[ctx].append(filename)
for ctx, filenames in summarizations.items():
if not ctx.design_filename or not ctx.task_filename:
continue # cause by `__init__.py` which is created by `init_python_folder`
ctx.codes_filenames = filenames
new_summarize = SummarizeCode(i_context=ctx, context=self.context, llm=self.llm)
for i, act in enumerate(self.summarize_todos):
@ -391,6 +406,14 @@ class Engineer(Role):
"""AgentStore uses this attribute to display to the user what actions the current role should take."""
return self.next_todo_action
async def _init_python_folder(self, task_list: List[str]):
for i in task_list:
filename = Path(i)
if filename.suffix != ".py":
continue
workdir = self.src_workspace / filename.parent
await init_python_folder(workdir)
async def _is_fixbug(self) -> bool:
fixbug_doc = await self.project_repo.docs.get(BUGFIX_FILENAME)
return bool(fixbug_doc and fixbug_doc.content)

View file

@ -21,7 +21,7 @@ from metagpt.const import MESSAGE_ROUTE_TO_NONE
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Document, Message, RunCodeContext, TestingContext
from metagpt.utils.common import any_to_str_set, parse_recipient
from metagpt.utils.common import any_to_str_set, init_python_folder, parse_recipient
class QaEngineer(Role):
@ -141,6 +141,7 @@ class QaEngineer(Role):
)
async def _act(self) -> Message:
await init_python_folder(self.project_repo.tests.workdir)
if self.test_round > self.test_round_allowed:
result_msg = Message(
content=f"Exceeding {self.test_round_allowed} rounds of tests, skip (writing code counts as a round, too)",

View file

@ -21,6 +21,7 @@ import os.path
import uuid
from abc import ABC
from asyncio import Queue, QueueEmpty, wait_for
from enum import Enum
from json import JSONDecodeError
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Type, TypeVar, Union
@ -792,3 +793,26 @@ class UMLClassView(UMLClassMeta):
method.return_type = i.return_args.type_
class_view.methods.append(method)
return class_view
class BaseEnum(Enum):
"""Base class for enums."""
def __new__(cls, value, desc=None):
"""
Construct an instance of the enum member.
Args:
cls: The class.
value: The value of the enum member.
desc: The description of the enum member. Defaults to None.
"""
if issubclass(cls, str):
obj = str.__new__(cls, value)
elif issubclass(cls, int):
obj = int.__new__(cls, value)
else:
obj = object.__new__(cls)
obj._value_ = value
obj.desc = desc
return obj

View file

@ -13,6 +13,15 @@ from metagpt.tools.libs import (
email_login,
terminal,
)
from metagpt.tools.libs.software_development import (
write_prd,
write_design,
write_project_plan,
write_codes,
run_qa_test,
fix_bug,
git_archive,
)
_ = (
data_preprocess,
@ -21,5 +30,12 @@ _ = (
gpt_v_generator,
web_scraping,
email_login,
write_prd,
write_design,
write_project_plan,
write_codes,
run_qa_test,
fix_bug,
git_archive,
terminal,
) # Avoid pre-commit error

65
metagpt/tools/libs/git.py Normal file
View file

@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.git_repository import GitRepository
@register_tool(tags=["git"])
async def git_clone(url: str, output_dir: str | Path = None) -> Path:
"""
Clones a Git repository from the given URL.
Args:
url (str): The URL of the Git repository to clone.
output_dir (str or Path, optional): The directory where the repository will be cloned.
If not provided, the repository will be cloned into the current working directory.
Returns:
Path: The path to the cloned repository.
Raises:
ValueError: If the specified Git root is invalid.
Example:
>>> # git clone to /TO/PATH
>>> url = 'https://github.com/geekan/MetaGPT.git'
>>> output_dir = "/TO/PATH"
>>> repo_dir = await git_clone(url=url, output_dir=output_dir)
>>> print(repo_dir)
/TO/PATH/MetaGPT
>>> # git clone to default directory.
>>> url = 'https://github.com/geekan/MetaGPT.git'
>>> repo_dir = await git_clone(url)
>>> print(repo_dir)
/WORK_SPACE/downloads/MetaGPT
"""
repo = await GitRepository.clone_from(url, output_dir)
return repo.workdir
async def git_checkout(repo_dir: str | Path, commit_id: str):
"""
Checks out a specific commit in a Git repository.
Args:
repo_dir (str or Path): The directory containing the Git repository.
commit_id (str): The ID of the commit to check out.
Raises:
ValueError: If the specified Git root is invalid.
Example:
>>> repo_dir = '/TO/GIT/REPO'
>>> commit_id = 'main'
>>> await git_checkout(repo_dir=repo_dir, commit_id=commit_id)
git checkout main
"""
repo = GitRepository(local_path=repo_dir, auto_init=False)
if not repo.is_valid:
ValueError(f"Invalid git root: {repo_dir}")
await repo.checkout(commit_id)

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Dict, List, Tuple, Union
async def shell_execute(
command: Union[List[str], str], cwd: str | Path = None, env: Dict = None, timeout: int = 600
) -> Tuple[str, str, int]:
"""
Execute a command asynchronously and return its standard output and standard error.
Args:
command (Union[List[str], str]): The command to execute and its arguments. It can be provided either as a list
of strings or as a single string.
cwd (str | Path, optional): The current working directory for the command. Defaults to None.
env (Dict, optional): Environment variables to set for the command. Defaults to None.
timeout (int, optional): Timeout for the command execution in seconds. Defaults to 600.
Returns:
Tuple[str, str, int]: A tuple containing the string type standard output and string type standard error of the executed command and int type return code.
Raises:
ValueError: If the command times out, this error is raised. The error message contains both standard output and
standard error of the timed-out process.
Example:
>>> # command is a list
>>> stdout, stderr, returncode = await shell_execute(command=["ls", "-l"], cwd="/home/user", env={"PATH": "/usr/bin"})
>>> print(stdout)
total 8
-rw-r--r-- 1 user user 0 Mar 22 10:00 file1.txt
-rw-r--r-- 1 user user 0 Mar 22 10:00 file2.txt
...
>>> # command is a string of shell script
>>> stdout, stderr, returncode = await shell_execute(command="ls -l", cwd="/home/user", env={"PATH": "/usr/bin"})
>>> print(stdout)
total 8
-rw-r--r-- 1 user user 0 Mar 22 10:00 file1.txt
-rw-r--r-- 1 user user 0 Mar 22 10:00 file2.txt
...
References:
This function uses `subprocess.Popen` for executing shell commands asynchronously.
"""
cwd = str(cwd) if cwd else None
shell = True if isinstance(command, str) else False
process = subprocess.Popen(command, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, shell=shell)
try:
# Wait for the process to complete, with a timeout
stdout, stderr = process.communicate(timeout=timeout)
return stdout.decode("utf-8"), stderr.decode("utf-8"), process.returncode
except subprocess.TimeoutExpired:
process.kill() # Kill the process if it times out
stdout, stderr = process.communicate()
raise ValueError(f"{stdout.decode('utf-8')}\n{stderr.decode('utf-8')}")

View file

@ -0,0 +1,368 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from typing import Optional
from metagpt.const import BUGFIX_FILENAME, REQUIREMENT_FILENAME
from metagpt.logs import ToolLogItem, log_tool_output
from metagpt.schema import BugFixContext, Message
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import any_to_str
@register_tool(tags=["software development", "ProductManager"])
async def write_prd(idea: str, project_path: Optional[str | Path] = None) -> Path:
"""Writes a PRD based on user requirements.
Args:
idea (str): The idea or concept for the PRD.
project_path (Optional[str|Path], optional): The path to an existing project directory.
If it's None, a new project path will be created. Defaults to None.
Returns:
Path: The path to the PRD files under the project directory
Example:
>>> # Create a new project:
>>> from metagpt.tools.libs.software_development import write_prd
>>> prd_path = await write_prd("Create a new feature for the application")
>>> print(prd_path)
'/path/to/project_path/docs/prd/'
>>> # Add user requirements to the exists project:
>>> from metagpt.tools.libs.software_development import write_prd
>>> project_path = '/path/to/exists_project_path'
>>> prd_path = await write_prd("Create a new feature for the application", project_path=project_path)
>>> print(prd_path = )
'/path/to/project_path/docs/prd/'
"""
from metagpt.actions import UserRequirement
from metagpt.context import Context
from metagpt.roles import ProductManager
ctx = Context()
if project_path:
ctx.config.project_path = Path(project_path)
ctx.config.inc = True
role = ProductManager(context=ctx)
msg = await role.run(with_message=Message(content=idea, cause_by=UserRequirement))
await role.run(with_message=msg)
outputs = [
ToolLogItem(name="PRD File", value=str(ctx.repo.docs.prd.workdir / i))
for i in ctx.repo.docs.prd.changed_files.keys()
]
for i in ctx.repo.resources.competitive_analysis.changed_files.keys():
outputs.append(
ToolLogItem(name="Competitive Analysis", value=str(ctx.repo.resources.competitive_analysis.workdir / i))
)
log_tool_output(output=outputs, tool_name=write_prd.__name__)
return ctx.repo.docs.prd.workdir
@register_tool(tags=["software development", "Architect"])
async def write_design(prd_path: str | Path) -> Path:
"""Writes a design to the project repository, based on the PRD of the project.
Args:
prd_path (str|Path): The path to the PRD files under the project directory.
Returns:
Path: The path to the system design files under the project directory.
Example:
>>> from metagpt.tools.libs.software_development import write_design
>>> prd_path = '/path/to/project_path/docs/prd' # Returned by `write_prd`
>>> system_design_path = await write_desgin(prd_path)
>>> print(system_design_path)
'/path/to/project_path/docs/system_design/'
"""
from metagpt.actions import WritePRD
from metagpt.context import Context
from metagpt.roles import Architect
ctx = Context()
prd_path = Path(prd_path)
project_path = (Path(prd_path) if not prd_path.is_file() else prd_path.parent) / "../.."
ctx.set_repo_dir(project_path)
role = Architect(context=ctx)
await role.run(with_message=Message(content="", cause_by=WritePRD))
outputs = [
ToolLogItem(name="Intermedia Design File", value=str(ctx.repo.docs.system_design.workdir / i))
for i in ctx.repo.docs.system_design.changed_files.keys()
]
for i in ctx.repo.resources.system_design.changed_files.keys():
outputs.append(ToolLogItem(name="Design File", value=str(ctx.repo.resources.system_design.workdir / i)))
for i in ctx.repo.resources.data_api_design.changed_files.keys():
outputs.append(
ToolLogItem(name="Class Diagram File", value=str(ctx.repo.resources.data_api_design.workdir / i))
)
for i in ctx.repo.resources.seq_flow.changed_files.keys():
outputs.append(ToolLogItem(name="Sequence Diagram File", value=str(ctx.repo.resources.seq_flow.workdir / i)))
log_tool_output(output=outputs, tool_name=write_design.__name__)
return ctx.repo.docs.system_design.workdir
@register_tool(tags=["software development", "Architect"])
async def write_project_plan(system_design_path: str | Path) -> Path:
"""Writes a project plan to the project repository, based on the design of the project.
Args:
system_design_path (str|Path): The path to the system design files under the project directory.
Returns:
Path: The path to task files under the project directory.
Example:
>>> from metagpt.tools.libs.software_development import write_project_plan
>>> system_design_path = '/path/to/project_path/docs/system_design/' # Returned by `write_design`
>>> task_path = await write_project_plan(system_design_path)
>>> print(task_path)
'/path/to/project_path/docs/task'
"""
from metagpt.actions import WriteDesign
from metagpt.context import Context
from metagpt.roles import ProjectManager
ctx = Context()
system_design_path = Path(system_design_path)
project_path = (system_design_path if not system_design_path.is_file() else system_design_path.parent) / "../.."
ctx.set_repo_dir(project_path)
role = ProjectManager(context=ctx)
await role.run(with_message=Message(content="", cause_by=WriteDesign))
outputs = [
ToolLogItem(name="Project Plan", value=str(ctx.repo.docs.task.workdir / i))
for i in ctx.repo.docs.task.changed_files.keys()
]
log_tool_output(output=outputs, tool_name=write_project_plan.__name__)
return ctx.repo.docs.task.workdir
@register_tool(tags=["software development", "Engineer"])
async def write_codes(task_path: str | Path, inc: bool = False) -> Path:
"""Writes code to implement designed features according to the project plan and adds them to the project repository.
Args:
task_path (str|Path): The path to task files under the project directory.
inc (bool, optional): Whether to write incremental codes. Defaults to False.
Returns:
Path: The path to the source code files under the project directory.
Example:
# Write codes to a new project
>>> from metagpt.tools.libs.software_development import write_codes
>>> task_path = '/path/to/project_path/docs/task' # Returned by `write_project_plan`
>>> src_path = await write_codes(task_path)
>>> print(src_path)
'/path/to/project_path/src/'
# Write increment codes to the exists project
>>> from metagpt.tools.libs.software_development import write_codes
>>> task_path = '/path/to/project_path/docs/task' # Returned by `write_prd`
>>> src_path = await write_codes(task_path, inc=True)
>>> print(src_path)
'/path/to/project_path/src/'
"""
from metagpt.actions import WriteTasks
from metagpt.context import Context
from metagpt.roles import Engineer
ctx = Context()
ctx.config.inc = inc
task_path = Path(task_path)
project_path = (task_path if not task_path.is_file() else task_path.parent) / "../.."
ctx.set_repo_dir(project_path)
role = Engineer(context=ctx)
msg = Message(content="", cause_by=WriteTasks, send_to=role)
me = {any_to_str(role), role.name}
while me.intersection(msg.send_to):
msg = await role.run(with_message=msg)
outputs = [
ToolLogItem(name="Source File", value=str(ctx.repo.srcs.workdir / i))
for i in ctx.repo.srcs.changed_files.keys()
]
log_tool_output(output=outputs, tool_name=write_codes.__name__)
return ctx.repo.srcs.workdir
@register_tool(tags=["software development", "QaEngineer"])
async def run_qa_test(src_path: str | Path) -> Path:
"""Run QA test on the project repository.
Args:
src_path (str|Path): The path to the source code files under the project directory.
Returns:
Path: The path to the unit tests under the project directory
Example:
>>> from metagpt.tools.libs.software_development import run_qa_test
>>> src_path = '/path/to/project_path/src/' # Returned by `write_codes`
>>> test_path = await run_qa_test(src_path)
>>> print(test_path)
'/path/to/project_path/tests'
"""
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.context import Context
from metagpt.environment import Environment
from metagpt.roles import QaEngineer
ctx = Context()
src_path = Path(src_path)
project_path = (src_path if not src_path.is_file() else src_path.parent) / ".."
ctx.set_repo_dir(project_path)
ctx.src_workspace = ctx.git_repo.workdir / ctx.git_repo.workdir.name
env = Environment(context=ctx)
role = QaEngineer(context=ctx)
env.add_role(role)
msg = Message(content="", cause_by=SummarizeCode, send_to=role)
env.publish_message(msg)
while not env.is_idle:
await env.run()
outputs = [
ToolLogItem(name="Unit Test File", value=str(ctx.repo.tests.workdir / i))
for i in ctx.repo.tests.changed_files.keys()
]
log_tool_output(output=outputs, tool_name=run_qa_test.__name__)
return ctx.repo.tests.workdir
@register_tool(tags=["software development", "Engineer"])
async def fix_bug(project_path: str | Path, issue: str) -> Path:
"""Fix bugs in the project repository.
Args:
project_path (str|Path): The path to the project repository.
issue (str): Description of the bug or issue.
Returns:
Path: The path to the project directory
Example:
>>> from metagpt.tools.libs.software_development import fix_bug
>>> project_path = '/path/to/project_path' # Returned by `write_codes`
>>> issue = 'Exception: exception about ...; Bug: bug about ...; Issue: issue about ...'
>>> project_path = await fix_bug(project_path=project_path, issue=issue)
>>> print(project_path)
'/path/to/project_path'
"""
from metagpt.actions.fix_bug import FixBug
from metagpt.context import Context
from metagpt.roles import Engineer
ctx = Context()
ctx.set_repo_dir(project_path)
ctx.src_workspace = ctx.git_repo.workdir / ctx.git_repo.workdir.name
await ctx.repo.docs.save(filename=BUGFIX_FILENAME, content=issue)
await ctx.repo.docs.save(filename=REQUIREMENT_FILENAME, content="")
role = Engineer(context=ctx)
bug_fix = BugFixContext(filename=BUGFIX_FILENAME)
msg = Message(
content=bug_fix.model_dump_json(),
instruct_content=bug_fix,
role="",
cause_by=FixBug,
sent_from=role,
send_to=role,
)
me = {any_to_str(role), role.name}
while me.intersection(msg.send_to):
msg = await role.run(with_message=msg)
outputs = [
ToolLogItem(name="Changed File", value=str(ctx.repo.srcs.workdir / i))
for i in ctx.repo.srcs.changed_files.keys()
]
log_tool_output(output=outputs, tool_name=fix_bug.__name__)
return project_path
@register_tool(tags=["software development", "git"])
async def git_archive(project_path: str | Path) -> str:
"""Stage and commit changes for the project repository using Git.
Args:
project_path (str|Path): The path to the project repository.
Returns:
git log
Example:
>>> from metagpt.tools.libs.software_development import git_archive
>>> project_path = '/path/to/project_path' # Returned by `write_prd`
>>> git_log = await git_archive(project_path=project_path)
>>> print(git_log)
commit a221d1c418c07f2b4fc07001e486285ead1a520a (HEAD -> feature/toollib/software_company, geekan/main)
Merge: e01afd09 4a72f398
Author: Sirui Hong <x@xx.github.com>
Date: Tue Mar 19 15:16:03 2024 +0800
Merge pull request #1037 from iorisa/fixbug/issues/1018
fixbug: #1018
"""
from metagpt.context import Context
ctx = Context()
ctx.set_repo_dir(project_path)
ctx.git_repo.archive()
outputs = [ToolLogItem(name="Git Commit", value=str(ctx.repo.workdir))]
log_tool_output(output=outputs, tool_name=git_archive.__name__)
return ctx.git_repo.log()
@register_tool(tags=["software development", "import git repo"])
async def import_git_repo(url: str) -> Path:
"""
Imports a project from a Git website and formats it to MetaGPT project format to enable incremental appending requirements.
Args:
url (str): The Git project URL, such as "https://github.com/geekan/MetaGPT.git".
Returns:
Path: The path of the formatted project.
Example:
# The Git project URL to input
>>> git_url = "https://github.com/geekan/MetaGPT.git"
# Import the Git repository and get the formatted project path
>>> formatted_project_path = await import_git_repo(git_url)
>>> print("Formatted project path:", formatted_project_path)
/PATH/TO/THE/FORMMATTED/PROJECT
"""
from metagpt.actions.import_repo import ImportRepo
from metagpt.context import Context
ctx = Context()
action = ImportRepo(repo_path=url, context=ctx)
await action.run()
outputs = [ToolLogItem(name="MetaGPT Project", value=str(ctx.repo.workdir))]
log_tool_output(output=outputs, tool_name=import_git_repo.__name__)
return ctx.repo.workdir

View file

@ -131,7 +131,7 @@ class ToolRecommender(BaseModel):
available_tools=available_tools,
topk=topk,
)
rsp = await LLM().aask(prompt)
rsp = await LLM().aask(prompt, stream=False)
rsp = CodeParser.parse_code(block=None, text=rsp)
ranked_tools = json.loads(rsp)

View file

@ -26,7 +26,7 @@ import sys
import traceback
from io import BytesIO
from pathlib import Path
from typing import Any, Callable, List, Literal, Tuple, Union
from typing import Any, Callable, List, Literal, Optional, Tuple, Union
from urllib.parse import quote, unquote
import aiofiles
@ -271,7 +271,7 @@ class CodeParser:
return block_dict
@classmethod
def parse_code(cls, block: str, text: str, lang: str = "") -> str:
def parse_code(cls, block: Optional[str], text: str, lang: str = "") -> str:
if block:
text = cls.parse_block(block, text)
pattern = rf"```{lang}.*?\s+(.*?)```"
@ -822,19 +822,78 @@ See FAQ 5.8
raise retry_state.outcome.exception()
def get_markdown_codeblock_type(filename: str) -> str:
async def get_mime_type(filename: str | Path, force_read: bool = False) -> str:
guess_mime_type, _ = mimetypes.guess_type(filename.name)
if not guess_mime_type:
ext_mappings = {".yml": "text/yaml", ".yaml": "text/yaml"}
guess_mime_type = ext_mappings.get(filename.suffix)
if not force_read and guess_mime_type:
return guess_mime_type
from metagpt.tools.libs.shell import shell_execute # avoid circular import
text_set = {
"application/json",
"application/vnd.chipnuts.karaoke-mmd",
"application/javascript",
"application/xml",
"application/x-sh",
"application/sql",
"text/yaml",
}
try:
stdout, _, _ = await shell_execute(f"file --mime-type {str(filename)}")
ix = stdout.rfind(" ")
mime_type = stdout[ix:].strip()
if mime_type == "text/plain" and guess_mime_type in text_set:
return guess_mime_type
return mime_type
except Exception as e:
logger.debug(f"file:{filename}, error:{e}")
return "unknown"
def get_markdown_codeblock_type(filename: str = None, mime_type: str = None) -> str:
"""Return the markdown code-block type corresponding to the file extension."""
mime_type, _ = mimetypes.guess_type(filename)
if not filename and not mime_type:
raise ValueError("Either filename or mime_type must be valid.")
if not mime_type:
mime_type, _ = mimetypes.guess_type(filename)
mappings = {
"text/x-shellscript": "bash",
"text/x-c++src": "cpp",
"text/css": "css",
"text/html": "html",
"text/x-java": "java",
"application/javascript": "javascript",
"application/json": "json",
"text/x-python": "python",
"text/x-ruby": "ruby",
"text/x-c": "cpp",
"text/yaml": "yaml",
"application/javascript": "javascript",
"application/json": "json",
"application/sql": "sql",
"application/vnd.chipnuts.karaoke-mmd": "mermaid",
"application/x-sh": "bash",
"application/xml": "xml",
}
return mappings.get(mime_type, "text")
def get_project_srcs_path(workdir: str | Path) -> Path:
src_workdir_path = workdir / ".src_workspace"
if src_workdir_path.exists():
with open(src_workdir_path, "r") as file:
src_name = file.read()
else:
src_name = Path(workdir).name
return Path(workdir) / src_name
async def init_python_folder(workdir: str | Path):
init_filename = Path(workdir) / "__init__.py"
if init_filename.exists():
return
async with aiofiles.open(init_filename, "a"):
os.utime(init_filename, None)

View file

@ -9,6 +9,7 @@
from __future__ import annotations
import shutil
import uuid
from enum import Enum
from pathlib import Path
from typing import Dict, List
@ -16,8 +17,10 @@ from typing import Dict, List
from git.repo import Repo
from git.repo.fun import is_git_dir
from gitignore_parser import parse_gitignore
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.logs import logger
from metagpt.tools.libs.shell import shell_execute
from metagpt.utils.dependency_file import DependencyFile
from metagpt.utils.file_repository import FileRepository
@ -52,7 +55,7 @@ class GitRepository:
self._dependency = None
self._gitignore_rules = None
if local_path:
self.open(local_path=local_path, auto_init=auto_init)
self.open(local_path=Path(local_path), auto_init=auto_init)
def open(self, local_path: Path, auto_init=False):
"""Open an existing Git repository or initialize a new one if auto_init is True.
@ -68,7 +71,7 @@ class GitRepository:
if not auto_init:
return
local_path.mkdir(parents=True, exist_ok=True)
return self._init(local_path)
self._init(local_path)
def _init(self, local_path: Path):
"""Initialize a new Git repository at the specified path.
@ -248,6 +251,8 @@ class GitRepository:
if not directory_path.exists():
return []
for file_path in directory_path.iterdir():
if not file_path.is_relative_to(root_relative_path):
continue
if file_path.is_file():
rpath = file_path.relative_to(root_relative_path)
files.append(str(rpath))
@ -283,3 +288,37 @@ class GitRepository:
continue
files.append(filename)
return files
@classmethod
@retry(wait=wait_random_exponential(min=1, max=15), stop=stop_after_attempt(3))
async def clone_from(cls, url: str | Path, output_dir: str | Path = None) -> "GitRepository":
from metagpt.context import Context
to_path = Path(output_dir or Path(__file__).parent / f"../../workspace/downloads/{uuid.uuid4().hex}").resolve()
to_path.mkdir(parents=True, exist_ok=True)
repo_dir = to_path / Path(url).stem
if repo_dir.exists():
shutil.rmtree(repo_dir, ignore_errors=True)
ctx = Context()
env = ctx.new_environ()
proxy = ["-c", f"http.proxy={ctx.config.proxy}"] if ctx.config.proxy else []
command = ["git", "clone"] + proxy + [str(url)]
logger.info(" ".join(command))
stdout, stderr, return_code = await shell_execute(command=command, cwd=str(to_path), env=env, timeout=600)
info = f"{stdout}\n{stderr}\nexit: {return_code}\n"
logger.info(info)
dir_name = Path(url).stem
to_path = to_path / dir_name
if not cls.is_git_dir(to_path):
raise ValueError(info)
logger.info(f"git clone to {to_path}")
return GitRepository(local_path=to_path, auto_init=False)
async def checkout(self, commit_id: str):
self._repository.git.checkout(commit_id)
logger.info(f"git checkout {commit_id}")
def log(self) -> str:
"""Return git log"""
return self._repository.git.log()

View file

@ -49,6 +49,10 @@ class GraphKeyword:
IS_COMPOSITE_OF = "is_composite_of"
IS_AGGREGATE_OF = "is_aggregate_of"
HAS_PARTICIPANT = "has_participant"
HAS_SUMMARY = "has_summary"
HAS_INSTALL = "has_install"
HAS_CONFIG = "has_config"
HAS_USAGE = "has_usage"
class SPO(BaseModel):

View file

@ -35,6 +35,7 @@ from metagpt.const import (
TEST_OUTPUTS_FILE_REPO,
VISUAL_GRAPH_REPO_FILE_REPO,
)
from metagpt.utils.common import get_project_srcs_path
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
@ -129,11 +130,10 @@ class ProjectRepo(FileRepository):
return self._git_repo.new_file_repository(self._srcs_path)
def code_files_exists(self) -> bool:
git_workdir = self.git_repo.workdir
src_workdir = git_workdir / git_workdir.name
src_workdir = get_project_srcs_path(self.git_repo.workdir)
if not src_workdir.exists():
return False
code_files = self.with_src_path(path=git_workdir / git_workdir.name).srcs.all_files
code_files = self.with_src_path(path=src_workdir).srcs.all_files
if not code_files:
return False
return bool(code_files)

View file

@ -5,17 +5,24 @@ This file provides functionality to convert a local repository into a markdown r
"""
from __future__ import annotations
import mimetypes
import re
from pathlib import Path
from typing import Tuple
from gitignore_parser import parse_gitignore
from metagpt.logs import logger
from metagpt.utils.common import aread, awrite, get_markdown_codeblock_type, list_files
from metagpt.utils.common import (
aread,
awrite,
get_markdown_codeblock_type,
get_mime_type,
list_files,
)
from metagpt.utils.tree import tree
async def repo_to_markdown(repo_path: str | Path, output: str | Path = None, gitignore: str | Path = None) -> str:
async def repo_to_markdown(repo_path: str | Path, output: str | Path = None) -> str:
"""
Convert a local repository into a markdown representation.
@ -25,56 +32,108 @@ async def repo_to_markdown(repo_path: str | Path, output: str | Path = None, git
Args:
repo_path (str | Path): The path to the local repository.
output (str | Path, optional): The path to save the generated markdown file. Defaults to None.
gitignore (str | Path, optional): The path to the .gitignore file. Defaults to None.
Returns:
str: The markdown representation of the repository.
"""
repo_path = Path(repo_path)
gitignore = Path(gitignore or Path(__file__).parent / "../../.gitignore").resolve()
repo_path = Path(repo_path).resolve()
gitignore_file = repo_path / ".gitignore"
markdown = await _write_dir_tree(repo_path=repo_path, gitignore=gitignore)
markdown = await _write_dir_tree(repo_path=repo_path, gitignore=gitignore_file)
gitignore_rules = parse_gitignore(full_path=str(gitignore))
gitignore_rules = parse_gitignore(full_path=str(gitignore_file)) if gitignore_file.exists() else None
markdown += await _write_files(repo_path=repo_path, gitignore_rules=gitignore_rules)
if output:
await awrite(filename=str(output), data=markdown, encoding="utf-8")
output_file = Path(output).resolve()
output_file.parent.mkdir(parents=True, exist_ok=True)
await awrite(filename=str(output_file), data=markdown, encoding="utf-8")
logger.info(f"save: {output_file}")
return markdown
async def _write_dir_tree(repo_path: Path, gitignore: Path) -> str:
try:
content = tree(repo_path, gitignore, run_command=True)
content = await tree(repo_path, gitignore, run_command=True)
except Exception as e:
logger.info(f"{e}, using safe mode.")
content = tree(repo_path, gitignore, run_command=False)
content = await tree(repo_path, gitignore, run_command=False)
doc = f"## Directory Tree\n```text\n{content}\n```\n---\n\n"
return doc
async def _write_files(repo_path, gitignore_rules) -> str:
async def _write_files(repo_path, gitignore_rules=None) -> str:
filenames = list_files(repo_path)
markdown = ""
pattern = r"^\..*" # Hidden folders/files
for filename in filenames:
if gitignore_rules(str(filename)):
if gitignore_rules and gitignore_rules(str(filename)):
continue
ignore = False
for i in filename.parts:
if re.match(pattern, i):
ignore = True
break
if ignore:
continue
markdown += await _write_file(filename=filename, repo_path=repo_path)
return markdown
async def _write_file(filename: Path, repo_path: Path) -> str:
relative_path = filename.relative_to(repo_path)
markdown = f"## {relative_path}\n"
mime_type, _ = mimetypes.guess_type(filename.name)
if "text/" not in mime_type:
is_text, mime_type = await _is_text_file(filename)
if not is_text:
logger.info(f"Ignore content: {filename}")
markdown += "<binary file>\n---\n\n"
return ""
try:
relative_path = filename.relative_to(repo_path)
markdown = f"## {relative_path}\n"
content = await aread(filename, encoding="utf-8")
content = content.replace("```", "\\`\\`\\`").replace("---", "\\-\\-\\-")
code_block_type = get_markdown_codeblock_type(filename.name)
markdown += f"```{code_block_type}\n{content}\n```\n---\n\n"
return markdown
content = await aread(filename, encoding="utf-8")
content = content.replace("```", "\\`\\`\\`").replace("---", "\\-\\-\\-")
code_block_type = get_markdown_codeblock_type(filename.name)
markdown += f"```{code_block_type}\n{content}\n```\n---\n\n"
return markdown
except Exception as e:
logger.error(e)
return ""
async def _is_text_file(filename: Path) -> Tuple[bool, str]:
pass_set = {
"application/json",
"application/vnd.chipnuts.karaoke-mmd",
"application/javascript",
"application/xml",
"application/x-sh",
"application/sql",
}
denied_set = {
"application/zlib",
"application/octet-stream",
"image/svg+xml",
"application/pdf",
"application/msword",
"application/vnd.ms-excel",
"audio/x-wav",
"application/x-git",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/zip",
"image/jpeg",
"audio/mpeg",
"video/mp2t",
"inode/x-empty",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"image/png",
"image/vnd.microsoft.icon",
"video/mp4",
}
mime_type = await get_mime_type(filename, force_read=True)
v = "text/" in mime_type or mime_type in pass_set
if v:
return True, mime_type
if mime_type not in denied_set:
logger.info(mime_type)
return False, mime_type

View file

@ -27,14 +27,15 @@
"""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Callable, Dict, List
from gitignore_parser import parse_gitignore
from metagpt.tools.libs.shell import shell_execute
def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = False) -> str:
async def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = False) -> str:
"""
Recursively traverses the directory structure and prints it out in a tree-like format.
@ -80,7 +81,7 @@ def tree(root: str | Path, gitignore: str | Path = None, run_command: bool = Fal
"""
root = Path(root).resolve()
if run_command:
return _execute_tree(root, gitignore)
return await _execute_tree(root, gitignore)
git_ignore_rules = parse_gitignore(gitignore) if gitignore else None
dir_ = {root.name: _list_children(root=root, git_ignore_rules=git_ignore_rules)}
@ -129,12 +130,7 @@ def _add_line(rows: List[str]) -> List[str]:
return rows
def _execute_tree(root: Path, gitignore: str | Path) -> str:
async def _execute_tree(root: Path, gitignore: str | Path) -> str:
args = ["--gitfile", str(gitignore)] if gitignore else []
try:
result = subprocess.run(["tree"] + args + [str(root)], capture_output=True, text=True, check=True)
if result.returncode != 0:
raise ValueError(f"tree exits with code {result.returncode}")
return result.stdout
except subprocess.CalledProcessError as e:
raise e
stdout, _, _ = await shell_execute(["tree"] + args + [str(root)])
return stdout

View file

@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
import pytest
from metagpt.actions.extract_readme import ExtractReadMe
from metagpt.llm import LLM
@pytest.mark.asyncio
async def test_learn_readme(context):
action = ExtractReadMe(
name="RedBean",
i_context=str(Path(__file__).parent.parent.parent.parent),
llm=LLM(),
context=context,
)
await action.run()
rows = await action.graph_db.select()
assert rows
assert context.repo.docs.graph_repo.changed_files
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from metagpt.actions.import_repo import ImportRepo
from metagpt.context import Context
from metagpt.utils.common import list_files
@pytest.mark.asyncio
@pytest.mark.parametrize(
"repo_path",
[
"https://github.com/spec-first/connexion.git",
# "https://github.com/geekan/MetaGPT.git"
],
)
@pytest.mark.skip
async def test_import_repo(repo_path):
context = Context()
action = ImportRepo(repo_path=repo_path, context=context)
await action.run()
assert context.repo
prd = list_files(context.repo.docs.prd.workdir)
assert prd
design = list_files(context.repo.docs.system_design.workdir)
assert design
assert prd[0].stem == design[0].stem
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -61,7 +61,7 @@ async def test_rebuild(context, mocker):
],
)
def test_get_full_filename(root, pathname, want):
res = RebuildSequenceView._get_full_filename(root=root, pathname=pathname)
res = RebuildSequenceView.get_full_filename(root=root, pathname=pathname)
assert res == want

View file

@ -0,0 +1,179 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import List
import pytest
from metagpt.roles.di.mgx import MGX
from metagpt.schema import Message
DEMO_CONTENT = [
{
"role": "user",
"content": "Can you build TextToSummarize which is a SMS number that I can text and it will scrape a website "
"and summarize it with ChatGPT",
},
{
"role": "assistant",
"content": "Absolutely, I can build a service like TextToSummarize for you. The process will involve setting up"
" an SMS service that can receive your texts, scraping the website content you send, and then using"
" an AI to summarize the content.\nTo get started, I'll need to set up a number for receiving SMS"
", then I'll work on the script to scrape the website content from the URLs you text, and finally."
" I'll integrate it with an AI service to Generalte the summaries.\n I'll keep you updated on my"
" progress!",
},
{"role": "user", "content": "What exactly do we need the web app for?"},
{
"role": "assistant",
"content": "The web app will service as the interface between the SMS service and the AI summarization service"
". When you send a text with a URL to the SMS number, teh web app will process that, scrape the"
" content from the URL, and then use the AI to summarize it. The summary will then be sent back to"
" you. It's the central hub that ties all the components of the service togather.",
},
]
DEMO1_CONTENT = [
{
"role": "user",
"content": "Extract all of the blog posts from `https://stripe.com/blog/page/1` and return a CSV with the"
" columns `date`, `article_text`, `author` and `summary`. Generate a summary for each article"
" yourself.",
}
]
_DEMO2_CONTENT_ASSISTANT = """
user:
## User Requirement
### User Requirement Detail
```text
2048 game
````
### Knowledge
To meet user requirements, the following standard operating procedure(SOP) must be used. SOP descriptions cannot be modified; user requirements can only be appended to the end of corresponding steps.
- Writes a PRD based on software requirements.
- Writes a design to the project repository, based on the PRD of the project.
- Writes a project plan to the project repository, based on the design of the project.
- Writes code to implement designed features according to the project plan and adds them to the project repository.
- Stage and commit changes for the project repository using Git.
## Context
## Current Plan
[
{
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Write a PRD based on the software requirements.",
"task_type": "other",
"code": "from metagpt.tools.libs.software_development import write_prd\n\nasync def create_2048_game_prd():\n idea = \"Create a 2048 game\"\n prd_path = await write_prd(idea)\n return prd_path\n\nawait create_2048_game_prd()\n",
"result": ",,,,,[,CONTENT]\n{\n \"Language\": \"en_us\",\n \"Programming Language\": \"Python\",\n \"Original Requirements\": \"Create a, 2048 game\",\n \"Project Name\": \"game_2048\",\n \"Product Goals\": [\n \"Create an engaging user experience\",\n , \"Improve accessibility, be responsive\",\n \"More beautiful UI\"\n ],\n \"User Stories\": [\n \",As a player, I want to be able to choose difficulty levels\",\n \"As a player,, I want to see my score after each game\",\n \"As a player, I want, to get restart button when I lose\",\n \"As a player, I want to see beautiful UI that make me feel good\",\n \"As a player, I want to play game via mobile phone\"\n , ],\n \"Competitive Analysis\": [\n \"2048 Game, A: Simple interface, lacks responsive features\",\n \"play2048.co: Beautiful and responsive UI with my best score shown\",\n \"2048game.com: Responsive UI with my, best score shown, but many ads\"\n ],\n \"Competitive Quadrant, Chart\": \"quadrantChart\\n title, \\\"Reach and engagement of campaigns\\\"\\n x-axis \\\"Low Reach\\\" --> \\\"High Reach\\\"\\n y-axis \\\"Low Engagement\\\" --> \\\",High Engagement\\\"\\n quadrant-1 \\\"We should expand\\\"\\n quadrant-2 \\\"Need to promote\\\"\\n quadrant,-3 \\\"Re-evaluate\\\"\\n quadrant-4 \\\"May be, improved\\\"\\n \\\"Campaign A\\\": [0.3, 0.6]\\n \\\"Campaign B\\\": [0.45, 0.,23]\\n \\\"Campaign C\\\": [0.57, 0.69]\\n \\\"Campaign D\\\": [0.78, 0.34]\\n , \\\"Campaign E\\\": [0.40, 0.34]\\n \\\"Campaign, F\\\": [0.35, 0.78]\\n \\\"Our Target Product\\\": [0.5, 0.6,]\",\n \"Requirement Analysis\": \"\",\n \"Requirement Pool\": [\n [\n \"P0\",\n, \"The main code ...\"\n ],\n [\n \"P0\",\n \"The game algorithm ...\"\n ]\n, ],\n \"UI Design draft\": \"Basic function description with a simple style and layout.\",\n \"Anything UNCLEAR,\": \"\"\n}\n[/CONTENT]\n,,,,,,,,,,,,{'output': '[{\"type_\": \"str\", \"name\": \"PRD File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/prd/20240401194850.json\"}, {\"type_\": \"str\", \"name\": \"Competitive Analysis\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/competitive_analysis/20240401194850.mmd\"}, {\"type_\": \"str\", \"name\": \"Competitive Analysis\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/competitive_analysis/20240401194850.pdf\"}, {\"type_\": \"str\", \"name\": \"Competitive Analysis\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/competitive_analysis/20240401194850.png\"}, {\"type_\": \"str\", \"name\": \"Competitive Analysis\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/competitive_analysis/20240401194850.svg\"}]', 'tool_name': 'write_prd'},PosixPath('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/prd')",
"is_success": true,
"is_finished": true
},
{
"task_id": "2",
"dependent_task_ids": [
"1"
],
"instruction": "Write a design to the project repository, based on the PRD of the project.",
"task_type": "other",
"code": "from metagpt.tools.libs.software_development import write_design\n\nasync def create_2048_game_design(prd_path):\n system_design_path = await write_design(prd_path)\n return system_design_path\n\nawait create_2048_game_design('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/prd/20240401194850.json')\n",
"result": ",[CONTENT]\n{\n \"Implementation approach\":, \"We will use the Py,game library to create the game interface and handle user input. The game logic will be implemented using Python classes and data structures to manage the game, board and tiles.\",\n \"File list\": [\n \"main.py,\",\n \"game.py\"\n, ],\n \"Data structures and interfaces\": \"\\nclassDiagram\\n class Game2048 {\\n -int[][] board,\\n -int score\\n -bool game_over\\n +__init__()\\n +reset()\\n +,move(direction: str)\\n +get_board() int[][]\\n +get_score() int\\n +is_game_over() bool\\n }\\n , class UI {\\n +display_board(board: int[][], score: int)\\n, +show_game_over()\\n }\\n Game,2048 --> UI\\n\",\n \"Program call flow\": \"\\nsequenceDiagram,\\n participant M as Main\\n participant G as Game2048,\\n participant UI as UI\\n M->>G: reset,()\\n G-->>UI: display_board(board, score)\\,n M->>G: move(direction)\\n G-->>,G: update board and score\\n G-->>UI:, display_board(board, score)\\n G->>G: is_game_over,()\\n G-->>UI: show_game_over()\\n\",\n,,,,, \"Anything UNCLEAR\": \"\"\n}\n[/CONTENT]\n,,,,,,,,,,,,,,,{'output': '[{\"type_\": \"str\", \"name\": \"Intermedia Design File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/system_design/20240401194850.json\"}, {\"type_\": \"str\", \"name\": \"Design File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/system_design/20240401194850.md\"}, {\"type_\": \"str\", \"name\": \"Class Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/data_api_design/20240401194850.mmd\"}, {\"type_\": \"str\", \"name\": \"Class Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/data_api_design/20240401194850.pdf\"}, {\"type_\": \"str\", \"name\": \"Class Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/data_api_design/20240401194850.png\"}, {\"type_\": \"str\", \"name\": \"Class Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/data_api_design/20240401194850.svg\"}, {\"type_\": \"str\", \"name\": \"Sequence Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/seq_flow/20240401194850.mmd\"}, {\"type_\": \"str\", \"name\": \"Sequence Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/seq_flow/20240401194850.pdf\"}, {\"type_\": \"str\", \"name\": \"Sequence Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/seq_flow/20240401194850.png\"}, {\"type_\": \"str\", \"name\": \"Sequence Diagram File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/resources/seq_flow/20240401194850.svg\"}]', 'tool_name': 'write_design'},PosixPath('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/system_design')",
"is_success": true,
"is_finished": true
},
{
"task_id": "3",
"dependent_task_ids": [
"2"
],
"instruction": "Write a project plan to the project repository, based on the design of the project.",
"task_type": "other",
"code": "from metagpt.tools.libs.software_development import write_project_plan\n\nasync def create_2048_game_project_plan(system_design_path):\n task_path = await write_project_plan(system_design_path)\n return task_path\n\nawait create_2048_game_project_plan('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/system_design/20240401194850.json')\n",
"result": ",\n[CONTENT,]\n{\n \"Required Python packages\": [\n \"pygame==2.0.1\"\n ],\n \"Required, Other language third-party packages\": [\n \"No third-party dependencies required,\"\n ],\n \"Logic Analysis\": [\n , [\n \"game.py\",\n \"Contains Game2048, class and functions to manage the game board and tiles\"\n, ],\n [\n \"main.py\",\n \",Contains the main function, and imports Game2048, from game.py\"\n ]\n ],\n \"Task list,\": [\n \"game.py\",\n \"main.py\"\n, ],\n \"Full API spec\":, \"\",\n \"Shared Knowledge\": \"The Pygame library will be used, to create the game interface and handle user input. The game, logic will be implemented using Python classes and data structures to manage the game, board and tiles.\",\n \"Anything UNCLEAR\": \"No unclear aspects,,,,, mentioned.\"\n}\n[/CONTENT]\n{'output': '[{\"type_\": \"str\", \"name\": \"Project Plan\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/task/20240401194850.json\"}]', 'tool_name': 'write_project_plan'},PosixPath('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/task')",
"is_success": true,
"is_finished": true
},
{
"task_id": "4",
"dependent_task_ids": [
"3"
],
"instruction": "Write code to implement designed features according to the project plan and add them to the project repository.",
"task_type": "other",
"code": "from metagpt.tools.libs.software_development import write_codes\n\nasync def implement_2048_game_features(task_path):\n src_path = await write_codes(task_path)\n return src_path\n\nawait implement_2048_game_features('/Users/iorishinier/github/MetaGPT/workspace/game_2048/docs/task/20240401194850.json')\n",
"result": ",```python\n,## game.py\n\nclass Game2048,:\n def __init__(self):\n, self.board = [[0 for _, in range(4)] for _ in, range(4)]\n self.score =, 0\n self.game_over =, False\n\n def reset(self):\n , self.board = [[0 for _ in, range(4)] for _, in range(4)]\n self.score = 0\n self,.game_over = False\n\n , def move(self, direction: str):\n, # Implement the logic to move the, tiles in the specified direction\n, pass\n\n def get_board(self) ->, list:\n return self.board\n\n , def get_score(self) -> int:\n, return self.score\n\n def is,_game_over(self) -> bool:\n ,,,,, return self.game_over\n```\n,```,python\n## main.py\n\nimport pygame\nfrom game import Game2048\n\ndef main():\n # Initialize the game\n game = Game2048()\n game.reset()\n\n # Initialize Pygame\n, pygame.init()\n screen = pygame.display.set_mode((400, 400))\n pygame.display.set_caption('2048 Game,')\n\n running = True\n while running:\n for event in pygame.event.get():\n if, event.type == pygame.QUIT:\n running = False\n elif event.type == pygame.KEYDOWN:\n if event.key ==, pygame.K_UP:\n game.move('up')\n elif event.key == pygame.K_DOWN:\n , game.move('down')\n elif event.key == pygame.K_LEFT:\n game.move('left')\n elif, event.key == pygame.K_RIGHT,:\n game.move('right')\n\n # Display the game board and score\n board = game.get_board()\n score = game.get_score()\n display_board(screen, board, score)\n\n # Check, if the game is over\n if game.is_game_over():\n show_game_over(screen)\n\n pygame.display.update()\n\n pygame.quit()\n\n,def display_board(screen, board, score):\n # Write,,,, code to display the game board and score on the Pygame screen\n pass\n\ndef show_game_over(screen):\n # Write code to display the game over message on the Pygame screen\n pass\n\nif __name__ == \"__main__\":\n main()\n```\n,,{'output': '[{\"type_\": \"str\", \"name\": \"Source File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/game_2048/__init__.py\"}, {\"type_\": \"str\", \"name\": \"Source File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/game_2048/game.py\"}, {\"type_\": \"str\", \"name\": \"Source File\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048/game_2048/main.py\"}]', 'tool_name': 'write_codes'},PosixPath('/Users/iorishinier/github/MetaGPT/workspace/game_2048/game_2048')",
"is_success": true,
"is_finished": true
},
{
"task_id": "5",
"dependent_task_ids": [
"4"
],
"instruction": "Stage and commit changes for the project repository using Git.",
"task_type": "other",
"code": "from metagpt.tools.libs.software_development import git_archive\n\nproject_path = '/Users/iorishinier/github/MetaGPT/workspace/game_2048'\ngit_log = await git_archive(project_path)\nprint(git_log)\n",
"result": ",{'output': '[{\"type_\": \"str\", \"name\": \"Git Commit\", \"value\": \"/Users/iorishinier/github/MetaGPT/workspace/game_2048\"}]', 'tool_name': 'git_archive'}commit 41c731341d0da8b630709bca50d4841dcea98d43\nAuthor: 莘权 马 <mashenquan@fuzhi.ai>\nDate: Mon Apr 1 19:50:53 2024 +0800\n\n Archive\n\ncommit a61daa93f24e74bf934ea5a3ba217038f167ab1f\nAuthor: 莘权 马 <mashenquan@fuzhi.ai>\nDate: Mon Apr 1 19:48:40 2024 +0800\n\n Add .gitignore\n",
"is_success": true,
"is_finished": true
}
]
## Current Task
{}
"""
DEMO2_CONTENT = [
{"role": "user", "content": 'Create a "2048 game"'},
{"role": "assistant", "content": _DEMO2_CONTENT_ASSISTANT},
{"role": "user", "content": "TypeError: __init__() takes 1 positional argument but 2 were given"},
]
DEMO3_CONTENT = [
{"role": "user", "content": "git clone 'https://github.com/spec-first/connexion' and format to MetaGPT project"}
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"user_messages",
[
[Message.model_validate(i) for i in DEMO2_CONTENT if i["role"] == "user"],
[Message.model_validate(i) for i in DEMO_CONTENT if i["role"] == "user"],
[Message.model_validate(i) for i in DEMO1_CONTENT if i["role"] == "user"],
],
)
@pytest.mark.skip
async def test_mgx(user_messages: List[Message], context):
mgx = MGX(context=context, tools=["<all>"])
for i, msg in enumerate(user_messages):
await mgx.run(msg)
data = mgx.model_dump_json()
await context.repo.test_outputs.save(filename=f"{i}.json", content=data)
@pytest.mark.asyncio
@pytest.mark.parametrize(
("user_message", "history_messages"),
[(Message.model_validate(DEMO2_CONTENT[2]), [Message.model_validate(i) for i in DEMO2_CONTENT[0:2]])],
)
@pytest.mark.skip
async def test_mgx_fixbug(user_message: Message, history_messages: List[Message], context):
mgx = MGX(context=context, tools=["<all>"])
mgx.rc.memory.add_batch(history_messages)
await mgx.run(user_message)
data = mgx.model_dump_json()
await context.repo.test_outputs.save(filename="test_mgx_fixbug.json", content=data)
@pytest.mark.asyncio
@pytest.mark.parametrize("user_message", [Message.model_validate(i) for i in DEMO3_CONTENT if i["role"] == "user"])
@pytest.mark.skip
async def test_git_import(user_message, context):
mgx = MGX(context=context, tools=["<all>"])
await mgx.run(user_message)
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from pydantic import BaseModel
from metagpt.tools.libs.git import git_checkout, git_clone
from metagpt.utils.git_repository import GitRepository
class SWEBenchItem(BaseModel):
base_commit: str
repo: str
@pytest.mark.asyncio
@pytest.mark.parametrize(
["url", "commit_id"], [("https://github.com/sqlfluff/sqlfluff.git", "d19de0ecd16d298f9e3bfb91da122734c40c01e5")]
)
async def test_git(url: str, commit_id: str):
repo_dir = await git_clone(url)
assert repo_dir
await git_checkout(repo_dir, commit_id)
repo = GitRepository(repo_dir, auto_init=False)
repo.delete_repository()
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from metagpt.tools.libs.shell import shell_execute
@pytest.mark.asyncio
@pytest.mark.parametrize(
["command", "expect_stdout", "expect_stderr"],
[
(["file", f"{__file__}"], "Python script text executable, ASCII text", ""),
(f"file {__file__}", "Python script text executable, ASCII text", ""),
],
)
async def test_shell(command, expect_stdout, expect_stderr):
stdout, stderr = await shell_execute(command)
assert expect_stdout in stdout
assert stderr == expect_stderr
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
from metagpt.tools.libs import (
fix_bug,
git_archive,
run_qa_test,
write_codes,
write_design,
write_prd,
write_project_plan,
)
from metagpt.tools.libs.software_development import import_git_repo
@pytest.mark.asyncio
async def test_software_team():
path = await write_prd("snake game")
assert path
path = await write_design(path)
assert path
path = await write_project_plan(path)
assert path
path = await write_codes(path)
assert path
path = await run_qa_test(path)
assert path
issue = """
pygame 2.0.1 (SDL 2.0.14, Python 3.9.17)
Hello from the pygame community. https://www.pygame.org/contribute.html
Traceback (most recent call last):
File "/Users/ix/github/bak/MetaGPT/workspace/snake_game/snake_game/main.py", line 10, in <module>
main()
File "/Users/ix/github/bak/MetaGPT/workspace/snake_game/snake_game/main.py", line 7, in main
game.start_game()
File "/Users/ix/github/bak/MetaGPT/workspace/snake_game/snake_game/game.py", line 81, in start_game
x
NameError: name 'x' is not defined
"""
path = await fix_bug(path, issue)
assert path
new_path = await write_prd("snake game with moving enemy", path)
assert new_path == path
git_log = await git_archive(new_path)
assert git_log
@pytest.mark.asyncio
async def test_import_repo():
url = "https://github.com/spec-first/connexion.git"
path = await import_git_repo(url)
assert path
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -62,6 +62,7 @@ async def test_js_parser():
@pytest.mark.asyncio
@pytest.mark.skip
async def test_codes():
path = DEFAULT_WORKSPACE_ROOT / "snake_game"
repo_parser = RepoParser(base_directory=path)
@ -81,5 +82,13 @@ async def test_codes():
print(data)
@pytest.mark.asyncio
async def test_graph_select():
gdb_path = Path(__file__).parent / "../../data/graph_db/networkx.sequence_view.json"
gdb = await DiGraphRepository.load_from(gdb_path)
rows = await gdb.select()
assert rows
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -10,7 +10,12 @@ from metagpt.utils.repo_to_markdown import repo_to_markdown
@pytest.mark.parametrize(
["repo_path", "output"],
[(Path(__file__).parent.parent, Path(__file__).parent.parent.parent / f"workspace/unittest/{uuid.uuid4().hex}.md")],
[
(
Path(__file__).parent.parent.parent,
Path(__file__).parent / f"../../../workspace/unittest/{uuid.uuid4().hex}.md",
),
],
)
@pytest.mark.asyncio
async def test_repo_to_markdown(repo_path: Path, output: Path):

View file

@ -3,6 +3,7 @@ from typing import Optional, Union
from metagpt.config2 import config
from metagpt.configs.llm_config import LLMType
from metagpt.const import LLM_API_TIMEOUT
from metagpt.logs import logger
from metagpt.provider.azure_openai_api import AzureOpenAILLM
from metagpt.provider.constant import GENERAL_FUNCTION_SCHEMA
@ -22,7 +23,7 @@ class MockLLM(OriginalLLM):
self.rsp_cache: dict = {}
self.rsp_candidates: list[dict] = [] # a test can have multiple calls with the same llm, thus a list
async def acompletion_text(self, messages: list[dict], stream=False, timeout=3) -> str:
async def acompletion_text(self, messages: list[dict], stream=False, timeout=LLM_API_TIMEOUT) -> str:
"""Overwrite original acompletion_text to cancel retry"""
if stream:
resp = await self._achat_completion_stream(messages, timeout=timeout)
@ -37,7 +38,7 @@ class MockLLM(OriginalLLM):
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
timeout=3,
timeout=LLM_API_TIMEOUT,
stream=True,
) -> str:
if system_msgs:
@ -56,7 +57,7 @@ class MockLLM(OriginalLLM):
rsp = await self.acompletion_text(message, stream=stream, timeout=timeout)
return rsp
async def original_aask_batch(self, msgs: list, timeout=3) -> str:
async def original_aask_batch(self, msgs: list, timeout=LLM_API_TIMEOUT) -> str:
"""A copy of metagpt.provider.base_llm.BaseLLM.aask_batch, we can't use super().aask because it will be mocked"""
context = []
for msg in msgs:
@ -83,7 +84,7 @@ class MockLLM(OriginalLLM):
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
images: Optional[Union[str, list[str]]] = None,
timeout=3,
timeout=LLM_API_TIMEOUT,
stream=True,
) -> str:
# used to identify it a message has been called before
@ -98,7 +99,7 @@ class MockLLM(OriginalLLM):
rsp = await self._mock_rsp(msg_key, self.original_aask, msg, system_msgs, format_msgs, images, timeout, stream)
return rsp
async def aask_batch(self, msgs: list, timeout=3) -> str:
async def aask_batch(self, msgs: list, timeout=LLM_API_TIMEOUT) -> str:
msg_key = "#MSG_SEP#".join([msg if isinstance(msg, str) else msg.content for msg in msgs])
rsp = await self._mock_rsp(msg_key, self.original_aask_batch, msgs, timeout)
return rsp