feat: merge geekan:main

This commit is contained in:
莘权 马 2023-12-15 19:34:44 +08:00
commit aa5c6f7a1a
49 changed files with 1974 additions and 893 deletions

View file

@ -5,7 +5,7 @@
@Author : alexanderwu
@File : action.py
"""
import re
from abc import ABC
from typing import Optional
@ -14,8 +14,9 @@ from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action_output import ActionOutput
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.provider.postprecess.llm_output_postprecess import llm_output_postprecess
from metagpt.utils.common import OutputParser
from metagpt.utils.custom_decoder import CustomDecoder
from metagpt.utils.utils import general_after_log
class Action(ABC):
@ -25,20 +26,24 @@ class Action(ABC):
llm = LLM()
self.llm = llm
self.context = context
self.prefix = ""
self.profile = ""
self.desc = ""
self.content = ""
self.instruct_content = None
self.env = None
self.prefix = "" # aask*时会加上prefix作为system_message
self.profile = "" # FIXME: USELESS
self.desc = "" # for skill manager
self.nodes = ...
def set_env(self, env):
self.env = env
# Output, useless
# self.content = ""
# self.instruct_content = None
# self.env = None
# def set_env(self, env):
# self.env = env
def set_prefix(self, prefix, profile):
"""Set prefix for later usage"""
self.prefix = prefix
self.profile = profile
return self
def __str__(self):
return self.__class__.__name__
@ -53,7 +58,11 @@ class Action(ABC):
system_msgs.append(self.prefix)
return await self.llm.aask(prompt, system_msgs)
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
after=general_after_log(logger),
)
async def _aask_v1(
self,
prompt: str,
@ -62,29 +71,17 @@ class Action(ABC):
system_msgs: Optional[list[str]] = None,
format="markdown", # compatible to original format
) -> ActionOutput:
"""Append default prefix"""
if not system_msgs:
system_msgs = []
system_msgs.append(self.prefix)
content = await self.llm.aask(prompt, system_msgs)
logger.debug(content)
logger.debug(f"llm raw output:\n{content}")
output_class = ActionOutput.create_model_class(output_class_name, output_data_mapping)
if format == "json":
pattern = r"\[CONTENT\](\s*\{.*?\}\s*)\[/CONTENT\]"
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
if match:
content = match
break
parsed_data = CustomDecoder(strict=False).decode(content)
parsed_data = llm_output_postprecess(output=content, schema=output_class.schema(), req_key="[/CONTENT]")
else: # using markdown parser
parsed_data = OutputParser.parse_data_with_mapping(content, output_data_mapping)
logger.debug(parsed_data)
logger.debug(f"parsed_data:\n{parsed_data}")
instruct_content = output_class(**parsed_data)
return ActionOutput(content, instruct_content)

View file

@ -0,0 +1,346 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/11 18:45
@Author : alexanderwu
@File : action_node.py
"""
import json
import re
from typing import Any, Dict, List, Optional, Type
from pydantic import BaseModel, create_model, root_validator, validator
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions import ActionOutput
from metagpt.llm import BaseGPTAPI
from metagpt.logs import logger
from metagpt.utils.common import OutputParser
from metagpt.utils.custom_decoder import CustomDecoder
CONSTRAINT = """
- Language: Please use the same language as the user input.
- Format: output wrapped inside [CONTENT][/CONTENT] as format example, nothing else.
"""
SIMPLE_TEMPLATE = """
## context
{context}
-----
## format example
{example}
## nodes: "<node>: <type> # <comment>"
{instruction}
## constraint
{constraint}
## action
Fill in the above nodes based on the format example.
"""
def dict_to_markdown(d, prefix="-", postfix="\n"):
markdown_str = ""
for key, value in d.items():
markdown_str += f"{prefix} {key}: {value}{postfix}"
return markdown_str
class ActionNode:
"""ActionNode is a tree of nodes."""
mode: str
# Action Context
context: str # all the context, including all necessary info
llm: BaseGPTAPI # LLM with aask interface
children: dict[str, "ActionNode"]
# Action Input
key: str # Product Requirement / File list / Code
expected_type: Type # such as str / int / float etc.
# context: str # everything in the history.
instruction: str # the instructions should be followed.
example: Any # example for In Context-Learning.
# Action Output
content: str
instruct_content: BaseModel
def __init__(self, key: str, expected_type: Type, instruction: str, example: str, content: str = "",
children: dict[str, "ActionNode"] = None):
self.key = key
self.expected_type = expected_type
self.instruction = instruction
self.example = example
self.content = content
self.children = children if children is not None else {}
def __str__(self):
return (
f"{self.key}, {self.expected_type}, {self.instruction}, {self.example}" f", {self.content}, {self.children}"
)
def __repr__(self):
return self.__str__()
def add_child(self, node: "ActionNode"):
"""增加子ActionNode"""
self.children[node.key] = node
def add_children(self, nodes: List["ActionNode"]):
"""批量增加子ActionNode"""
for node in nodes:
self.add_child(node)
@classmethod
def from_children(cls, key, nodes: List["ActionNode"]):
"""直接从一系列的子nodes初始化"""
obj = cls(key, str, "", "")
obj.add_children(nodes)
return obj
def get_children_mapping(self) -> Dict[str, Type]:
"""获得子ActionNode的字典以key索引"""
return {k: (v.expected_type, ...) for k, v in self.children.items()}
def get_self_mapping(self) -> Dict[str, Type]:
"""get self key: type mapping"""
return {self.key: (self.expected_type, ...)}
def get_mapping(self, mode="children") -> Dict[str, Type]:
"""get key: type mapping under mode"""
if mode == "children" or (mode == "auto" and self.children):
return self.get_children_mapping()
return self.get_self_mapping()
@classmethod
def create_model_class(cls, class_name: str, mapping: Dict[str, Type]):
"""基于pydantic v1的模型动态生成用来检验结果类型正确性"""
new_class = create_model(class_name, **mapping)
@validator("*", allow_reuse=True)
def check_name(v, field):
if field.name not in mapping.keys():
raise ValueError(f"Unrecognized block: {field.name}")
return v
@root_validator(pre=True, allow_reuse=True)
def check_missing_fields(values):
required_fields = set(mapping.keys())
missing_fields = required_fields - set(values.keys())
if missing_fields:
raise ValueError(f"Missing fields: {missing_fields}")
return values
new_class.__validator_check_name = classmethod(check_name)
new_class.__root_validator_check_missing_fields = classmethod(check_missing_fields)
return new_class
@classmethod
def create_model_class_v2(cls, class_name: str, mapping: Dict[str, Type]):
"""基于pydantic v2的模型动态生成用来检验结果类型正确性待验证"""
new_class = create_model(class_name, **mapping)
@model_validator(mode="before")
def check_missing_fields(data):
required_fields = set(mapping.keys())
missing_fields = required_fields - set(data.keys())
if missing_fields:
raise ValueError(f"Missing fields: {missing_fields}")
return data
@field_validator("*")
def check_name(v: Any, field: str) -> Any:
if field not in mapping.keys():
raise ValueError(f"Unrecognized block: {field}")
return v
new_class.__model_validator_check_missing_fields = classmethod(check_missing_fields)
new_class.__field_validator_check_name = classmethod(check_name)
return new_class
def create_children_class(self):
"""使用object内有的字段直接生成model_class"""
class_name = f"{self.key}_AN"
mapping = self.get_children_mapping()
return self.create_model_class(class_name, mapping)
def to_dict(self, format_func=None, mode="auto") -> Dict:
"""将当前节点与子节点都按照node: format的格式组织成字典"""
# 如果没有提供格式化函数,使用默认的格式化方式
if format_func is None:
format_func = lambda node: f"{node.instruction}"
# 使用提供的格式化函数来格式化当前节点的值
formatted_value = format_func(self)
# 创建当前节点的键值对
if mode == "children" or (mode == "auto" and self.children):
node_dict = {}
else:
node_dict = {self.key: formatted_value}
if mode == "root":
return node_dict
# 遍历子节点并递归调用 to_dict 方法
for child_key, child_node in self.children.items():
node_dict.update(child_node.to_dict(format_func))
return node_dict
def compile_to(self, i: Dict, to) -> str:
if to == "json":
return json.dumps(i, indent=4)
elif to == "markdown":
return dict_to_markdown(i)
else:
return str(i)
def tagging(self, text, to, tag="") -> str:
if not tag:
return text
if to == "json":
return f"[{tag}]\n" + text + f"\n[/{tag}]"
else:
return f"[{tag}]\n" + text + f"\n[/{tag}]"
def _compile_f(self, to, mode, tag, format_func) -> str:
nodes = self.to_dict(format_func=format_func, mode=mode)
text = self.compile_to(nodes, to)
return self.tagging(text, to, tag)
def compile_instruction(self, to="raw", mode="children", tag="") -> str:
"""compile to raw/json/markdown template with all/root/children nodes"""
format_func = lambda i: f"{i.expected_type} # {i.instruction}"
return self._compile_f(to, mode, tag, format_func)
def compile_example(self, to="raw", mode="children", tag="") -> str:
"""compile to raw/json/markdown examples with all/root/children nodes"""
# 这里不能使用f-string因为转译为str后再json.dumps会额外加上引号无法作为有效的example
# 错误示例:"File list": "['main.py', 'const.py', 'game.py']", 注意这里值不是list而是str
format_func = lambda i: i.example
return self._compile_f(to, mode, tag, format_func)
def compile(self, context, to="json", mode="children", template=SIMPLE_TEMPLATE) -> str:
"""
mode: all/root/children
mode="children": 编译所有子节点为一个统一模板包括instruction与example
mode="all": NotImplemented
mode="root": NotImplemented
"""
# FIXME: json instruction会带来格式问题"Project name": "web_2048 # 项目名称使用下划线",
self.instruction = self.compile_instruction(to="markdown", mode=mode)
self.example = self.compile_example(to=to, tag="CONTENT", mode=mode)
prompt = template.format(
context=context, example=self.example, instruction=self.instruction, constraint=CONSTRAINT
)
return prompt
@retry(wait=wait_random_exponential(min=1, max=10), stop=stop_after_attempt(6))
async def _aask_v1(
self,
prompt: str,
output_class_name: str,
output_data_mapping: dict,
system_msgs: Optional[list[str]] = None,
format="markdown", # compatible to original format
) -> ActionOutput:
content = await self.llm.aask(prompt, system_msgs)
logger.debug(content)
output_class = ActionOutput.create_model_class(output_class_name, output_data_mapping)
if format == "json":
pattern = r"\[CONTENT\](\s*\{.*?\}\s*)\[/CONTENT\]"
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
if match:
content = match
break
parsed_data = CustomDecoder(strict=False).decode(content)
else: # using markdown parser
parsed_data = OutputParser.parse_data_with_mapping(content, output_data_mapping)
logger.debug(parsed_data)
instruct_content = output_class(**parsed_data)
return ActionOutput(content, instruct_content)
def get(self, key):
return self.instruct_content.dict()[key]
def set_recursive(self, name, value):
setattr(self, name, value)
for _, i in self.children.items():
i.set_recursive(name, value)
def set_llm(self, llm):
self.set_recursive("llm", llm)
def set_context(self, context):
self.set_recursive("context", context)
async def simple_fill(self, to, mode):
prompt = self.compile(context=self.context, to=to, mode=mode)
mapping = self.get_mapping(mode)
class_name = f"{self.key}_AN"
output = await self._aask_v1(prompt, class_name, mapping, format=to)
self.content = output.content
self.instruct_content = output.instruct_content
return self
async def fill(self, context, llm, to="json", mode="auto", strgy="simple"):
"""Fill the node(s) with mode.
:param context: Everything we should know when filling node.
:param llm: Large Language Model with pre-defined system message.
:param to: json/markdown, determine example and output format.
- json: it's easy to open source LLM with json format
- markdown: when generating code, markdown is always better
:param mode: auto/children/root
- auto: automated fill children's nodes and gather outputs, if no children, fill itself
- children: fill children's nodes and gather outputs
- root: fill root's node and gather output
:param strgy: simple/complex
- simple: run only once
- complex: run each node
:return: self
"""
self.set_llm(llm)
self.set_context(context)
if strgy == "simple":
return await self.simple_fill(to, mode)
elif strgy == "complex":
# 这里隐式假设了拥有children
tmp = {}
for _, i in self.children.items():
child = await i.simple_fill(to, mode)
tmp.update(child.instruct_content.dict())
cls = self.create_children_class()
self.instruct_content = cls(**tmp)
return self
def action_node_from_tuple_example():
# 示例:列表中包含元组
list_of_tuples = [("key1", str, "Instruction 1", "Example 1")]
# 从列表中创建 ActionNode 实例
nodes = [ActionNode(*data) for data in list_of_tuples]
for i in nodes:
logger.info(i)
if __name__ == "__main__":
action_node_from_tuple_example()

View file

@ -42,7 +42,7 @@ The message is as follows:
```
---
Now you should start rewriting the code:
## file name of the code to rewrite: Write code with triple quoto. Do your best to implement THIS IN ONLY ONE FILE.
## file name of the code to rewrite: Write code with triple quote. Do your best to implement THIS IN ONLY ONE FILE.
"""
@ -62,7 +62,7 @@ class DebugError(Action):
if matches:
return ""
logger.info(f"Debug and rewrite {self.context.code_filename}")
logger.info(f"Debug and rewrite {self.context.test_filename}")
code_doc = await FileRepository.get_file(
filename=self.context.code_filename, relative_path=CONFIG.src_workspace
)

View file

@ -11,9 +11,9 @@
"""
import json
from pathlib import Path
from typing import List
from metagpt.actions import Action, ActionOutput
from metagpt.actions.design_api_an import DESIGN_API_NODE
from metagpt.config import CONFIG
from metagpt.const import (
DATA_API_DESIGN_FILE_REPO,
@ -25,166 +25,19 @@ from metagpt.const import (
from metagpt.logs import logger
from metagpt.schema import Document, Documents
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.get_template import get_template
# from metagpt.utils.get_template import get_template
from metagpt.utils.mermaid import mermaid_to_file
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
{context}
# from typing import List
## Format example
{format_example}
-----
Role: You are an architect; the goal is to design a SOTA PEP8-compliant python system
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirement: Fill in the following missing information based on the context, each section name is a key in json
## Implementation approach: Provide as Plain text. Analyze the difficult points of the requirements, select appropriate open-source frameworks.
## Project name: Constant text.
## File list: Provided as Python list[str], the list of files needed (including HTML & CSS IF NEEDED) to write the program. Only need relative paths. ALWAYS write a main.py or app.py here
## Data structures and interfaces: Use mermaid classDiagram code syntax, including classes (INCLUDING __init__ method) and functions (with type annotations), CLEARLY MARK the RELATIONSHIPS between classes, and comply with PEP8 standards. The data structures SHOULD BE VERY DETAILED and the API should be comprehensive with a complete design.
## Program call flow: Use sequenceDiagram code syntax, COMPLETE and VERY DETAILED, using CLASSES AND API DEFINED ABOVE accurately, covering the CRUD AND INIT of each object, SYNTAX MUST BE CORRECT.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": """
[CONTENT]
{{
"Implementation approach": "We will ...",
"Project name": "{project_name}",
"File list": ["main.py"],
"Data structures and interfaces": '
classDiagram
class Game{{
+int score
}}
...
Game "1" -- "1" Food: has
',
"Program call flow": '
sequenceDiagram
participant M as Main
...
G->>M: end game
',
"Anything UNCLEAR": "The requirement is clear to me."
}}
[/CONTENT]
""",
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
{context}
## Format example
{format_example}
-----
Role: You are an architect; the goal is to design a SOTA PEP8-compliant python system; make the best use of good open source tools
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirement: Fill in the following missing information based on the context, note that all sections are response with code form separately
ATTENTION: Output carefully referenced "Format example" in format.
## Implementation approach: Provide as Plain text. Analyze the difficult points of the requirements, select the appropriate open-source framework.
## Project name: Constant text.
## File list: Provided as Python list[str], the list of code files (including HTML & CSS IF NEEDED) to write the program. Only need relative paths. ALWAYS write a main.py or app.py here
## Data structures and interfaces: Use mermaid classDiagram code syntax, including classes (INCLUDING __init__ method) and functions (with type annotations), CLEARLY MARK the RELATIONSHIPS between classes, and comply with PEP8 standards. The data structures SHOULD BE VERY DETAILED and the API should be comprehensive with a complete design.
## Program call flow: Use sequenceDiagram code syntax, COMPLETE and VERY DETAILED, using CLASSES AND API DEFINED ABOVE accurately, covering the CRUD AND INIT of each object, SYNTAX MUST BE CORRECT.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it.
""",
"FORMAT_EXAMPLE": """
---
## Implementation approach
We will ...
## Project name
```python
"{project_name}"
```
## File list
```python
[
"main.py",
]
```
## Data structures and interfaces
```mermaid
classDiagram
class Game{
+int score
}
...
Game "1" -- "1" Food: has
```
## Program call flow
```mermaid
sequenceDiagram
participant M as Main
...
G->>M: end game
```
## Anything UNCLEAR
The requirement is clear to me.
---
""",
},
}
OUTPUT_MAPPING = {
"Implementation approach": (str, ...),
"Project name": (str, ...),
"File list": (List[str], ...),
"Data structures and interfaces": (str, ...),
"Program call flow": (str, ...),
"Anything UNCLEAR": (str, ...),
}
MERGE_PROMPT = """
## Old Design
NEW_REQ_TEMPLATE = """
### Legacy Content
{old_design}
## Context
### New Requirements
{context}
-----
Role: You are an architect; The goal is to incrementally update the "Old Design" based on the information provided by the "Context," aiming to design a SOTA PEP8-compliant python system; make the best use of good open source tools
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirement: Fill in the following missing information based on the context, note that all sections are response with code form separately
ATTENTION: Output carefully referenced "Old Design" in format.
## Implementation approach: Provide as Plain text. Analyze the difficult points of the requirements, select the appropriate open-source framework.
## Project name: Constant text "{project_name}".
## File list: Provided as Python list[str], the list of code files (including HTML & CSS IF NEEDED) to write the program. Only need relative paths. ALWAYS write a main.py or app.py here
## Data structures and interfaces: Use mermaid classDiagram code syntax, including classes (INCLUDING __init__ method) and functions (with type annotations), CLEARLY MARK the RELATIONSHIPS between classes, and comply with PEP8 standards. The data structures SHOULD BE VERY DETAILED and the API should be comprehensive with a complete design.
## Program call flow: Use sequenceDiagram code syntax, COMPLETE and VERY DETAILED, using CLASSES AND API DEFINED ABOVE accurately, covering the CRUD AND INIT of each object, SYNTAX MUST BE CORRECT.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like "Old Design" format,
and only output the json inside this tag, nothing else
"""
@ -228,20 +81,13 @@ class WriteDesign(Action):
return ActionOutput(content=changed_files.json(), instruct_content=changed_files)
async def _new_system_design(self, context, format=CONFIG.prompt_format):
prompt_template, format_example = get_template(templates, format)
format_example = format_example.format(project_name=CONFIG.project_name)
prompt = prompt_template.format(context=context, format_example=format_example)
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING, format=format)
return system_design
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm, to=format)
return node
async def _merge(self, prd_doc, system_design_doc, format=CONFIG.prompt_format):
prompt = MERGE_PROMPT.format(
old_design=system_design_doc.content, context=prd_doc.content, project_name=CONFIG.project_name
)
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING, format=format)
# fix Python package name, we can't system_design.instruct_content.python_package_name = "xxx" since "Python
# package name" contain space, have to use setattr
system_design_doc.content = system_design.instruct_content.json(ensure_ascii=False)
context = NEW_REQ_TEMPLATE.format(old_design=system_design_doc.content, context=prd_doc.content)
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm, to=format)
system_design_doc.content = node.instruct_content.json(ensure_ascii=False)
return system_design_doc
async def _update_system_design(self, filename, prds_file_repo, system_design_file_repo) -> Document:

View file

@ -0,0 +1,72 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/12 22:24
@Author : alexanderwu
@File : design_api_an.py
"""
from metagpt.actions.action_node import ActionNode
from metagpt.logs import logger
from metagpt.utils.mermaid import MMC1, MMC2
IMPLEMENTATION_APPROACH = ActionNode(
key="Implementation approach",
expected_type=str,
instruction="Analyze the difficult points of the requirements, select the appropriate open-source framework",
example="We will ...",
)
PROJECT_NAME = ActionNode(
key="Project name", expected_type=str, instruction="The project name with underline", example="game_2048"
)
FILE_LIST = ActionNode(
key="File list",
expected_type=list[str],
instruction="Only need relative paths. ALWAYS write a main.py or app.py here",
example=["main.py", "game.py"],
)
DATA_STRUCTURES_AND_INTERFACES = ActionNode(
key="Data structures and interfaces",
expected_type=str,
instruction="Use mermaid classDiagram code syntax, including classes, method(__init__ etc.) and functions with type"
" annotations, CLEARLY MARK the RELATIONSHIPS between classes, and comply with PEP8 standards. "
"The data structures SHOULD BE VERY DETAILED and the API should be comprehensive with a complete design.",
example=MMC1,
)
PROGRAM_CALL_FLOW = ActionNode(
key="Program call flow",
expected_type=str,
instruction="Use sequenceDiagram code syntax, COMPLETE and VERY DETAILED, using CLASSES AND API DEFINED ABOVE "
"accurately, covering the CRUD AND INIT of each object, SYNTAX MUST BE CORRECT.",
example=MMC2,
)
ANYTHING_UNCLEAR = ActionNode(
key="Anything UNCLEAR",
expected_type=str,
instruction="Mention unclear project aspects, then try to clarify it.",
example="Clarification needed on third-party API integration, ...",
)
NODES = [
IMPLEMENTATION_APPROACH,
# PROJECT_NAME,
FILE_LIST,
DATA_STRUCTURES_AND_INTERFACES,
PROGRAM_CALL_FLOW,
ANYTHING_UNCLEAR,
]
DESIGN_API_NODE = ActionNode.from_children("DesignAPI", NODES)
def main():
prompt = DESIGN_API_NODE.compile(context="")
logger.info(prompt)
if __name__ == "__main__":
main()

View file

@ -27,8 +27,8 @@ class PrepareDocuments(Action):
# Create and initialize the workspace folder, initialize the Git environment.
project_name = CONFIG.project_name or FileRepository.new_filename()
workdir = CONFIG.project_path
if not workdir and CONFIG.workspace:
workdir = Path(CONFIG.workspace) / project_name
if not workdir and CONFIG.workspace_path:
workdir = Path(CONFIG.workspace_path) / project_name
workdir = Path(workdir or DEFAULT_WORKSPACE_ROOT / project_name)
if not CONFIG.inc and workdir.exists():
shutil.rmtree(workdir)

View file

@ -10,10 +10,10 @@
3. According to the design in Section 2.2.3.5.4 of RFC 135, add incremental iteration functionality.
"""
import json
from typing import List
from metagpt.actions import ActionOutput
from metagpt.actions.action import Action
from metagpt.actions.project_management_an import PM_NODE
from metagpt.config import CONFIG
from metagpt.const import (
PACKAGE_REQUIREMENTS_FILENAME,
@ -24,189 +24,17 @@ from metagpt.const import (
from metagpt.logs import logger
from metagpt.schema import Document, Documents
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.get_template import get_template
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
{context}
# from typing import List
## Format example
{format_example}
-----
Role: You are a project manager; the goal is to break down tasks according to PRD/technical design, give a task list, and analyze task dependencies to start with the prerequisite modules
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirements: Based on the context, fill in the following missing information, each section name is a key in json. Here the granularity of the task is a file, if there are any missing files, you can supplement them
ATTENTION: Output carefully referenced "Format example" in format.
# from metagpt.utils.get_template import get_template
## Required Python third-party packages: Provide Python list[str] in requirements.txt format
## Required Other language third-party packages: Provide Python list[str] in requirements.txt format
## Logic Analysis: Provided as a Python list[list[str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Task list: Provided as Python list[str]. Each str is a filename, the more at the beginning, the more it is a prerequisite dependency, should be done first
## Full API spec: Use OpenAPI 3.0. Describe all APIs that may be used by both frontend and backend.
## Shared Knowledge: Anything that should be public like utils' functions, config's variables details that should make clear first.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it. For example, don't forget a main entry. don't forget to init 3rd party libs.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": '''
{
"Required Python third-party packages": [
"flask==1.1.2",
"bcrypt==3.2.0"
],
"Required Other language third-party packages": [
"No third-party ..."
],
"Logic Analysis": [
["game.py", "Contains..."]
],
"Task list": [
"game.py"
],
"Full API spec": """
openapi: 3.0.0
...
description: A JSON object ...
""",
"Shared Knowledge": """
'game.py' contains ...
""",
"Anything UNCLEAR": "We need ... how to start."
}
''',
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
{context}
## Format example
{format_example}
-----
Role: You are a project manager; the goal is to break down tasks according to PRD/technical design, give a task list, and analyze task dependencies to start with the prerequisite modules
Requirements: Based on the context, fill in the following missing information, note that all sections are returned in Python code triple quote form seperatedly. Here the granularity of the task is a file, if there are any missing files, you can supplement them
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the code and triple quote.
## Required Python third-party packages: Provided in requirements.txt format
## Required Other language third-party packages: Provided in requirements.txt format
## Logic Analysis: Provided as a Python list[list[str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Task list: Provided as Python list[str]. Each str is a filename, the more at the beginning, the more it is a prerequisite dependency, should be done first
## Full API spec: Use OpenAPI 3.0. Describe all APIs that may be used by both frontend and backend.
## Shared Knowledge: Anything that should be public like utils' functions, config's variables details that should make clear first.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it. For example, don't forget a main entry. don't forget to init 3rd party libs.
""",
"FORMAT_EXAMPLE": '''
---
## Required Python third-party packages
```python
"""
flask==1.1.2
bcrypt==3.2.0
"""
```
## Required Other language third-party packages
```python
"""
No third-party ...
"""
```
## Full API spec
```python
"""
openapi: 3.0.0
...
description: A JSON object ...
"""
```
## Logic Analysis
```python
[
["index.js", "Contains ..."],
["main.py", "Contains ..."],
]
```
## Task list
```python
[
"index.js",
"main.py",
]
```
## Shared Knowledge
```python
"""
'game.py' contains ...
"""
```
## Anything UNCLEAR
We need ... how to start.
---
''',
},
}
OUTPUT_MAPPING = {
"Required Python third-party packages": (List[str], ...),
"Required Other language third-party packages": (List[str], ...),
"Full API spec": (str, ...),
"Logic Analysis": (List[List[str]], ...),
"Task list": (List[str], ...),
"Shared Knowledge": (str, ...),
"Anything UNCLEAR": (str, ...),
}
MERGE_PROMPT = """
# Context
{context}
## Old Tasks
NEW_REQ_TEMPLATE = """
### Legacy Content
{old_tasks}
-----
## Format example
{format_example}
-----
Role: You are a project manager; The goal is to merge the new PRD/technical design content from 'Context' into 'Old Tasks.' Based on this merged result, break down tasks, give a task list, and analyze task dependencies to start with the prerequisite modules.
Requirements: Based on the context, fill in the following missing information, each section name is a key in json. Here the granularity of the task is a file, if there are any missing files, you can supplement them
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the code and triple quote.
## Required Python third-party packages: Provided in requirements.txt format
## Required Other language third-party packages: Provided in requirements.txt format
## Full API spec: Use OpenAPI 3.0. Describe all APIs that may be used by both frontend and backend.
## Logic Analysis: Provided as a Python list[list[str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Task list: Provided as Python list[str]. Each str is a filename, the more at the beginning, the more it is a prerequisite dependency, should be done first
## Shared Knowledge: Anything that should be public like utils' functions, config's variables details that should make clear first.
## Anything UNCLEAR: Provide as Plain text. Make clear here. For example, don't forget a main entry. don't forget to init 3rd party libs.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like "Format example" format,
and only output the json inside this tag, nothing else
### New Requirements
{context}
"""
@ -262,17 +90,16 @@ class WriteTasks(Action):
return task_doc
async def _run_new_tasks(self, context, format=CONFIG.prompt_format):
prompt_template, format_example = get_template(templates, format)
prompt = prompt_template.format(context=context, format_example=format_example)
rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING, format=format)
return rsp
node = await PM_NODE.fill(context, self.llm, format)
# prompt_template, format_example = get_template(templates, format)
# prompt = prompt_template.format(context=context, format_example=format_example)
# rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING, format=format)
return node
async def _merge(self, system_design_doc, task_doc, format=CONFIG.prompt_format) -> Document:
_, format_example = get_template(templates, format)
prompt = MERGE_PROMPT.format(context=system_design_doc.content, old_tasks=task_doc.content,
format_example=format_example)
rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING, format=format)
task_doc.content = rsp.instruct_content.json(ensure_ascii=False)
context = NEW_REQ_TEMPLATE.format(context=system_design_doc.content, old_tasks=task_doc.content)
node = await PM_NODE.fill(context, self.llm, format)
task_doc.content = node.instruct_content.json(ensure_ascii=False)
return task_doc
@staticmethod

View file

@ -0,0 +1,85 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/14 15:28
@Author : alexanderwu
@File : project_management_an.py
"""
from metagpt.actions.action_node import ActionNode
from metagpt.logs import logger
REQUIRED_PYTHON_PACKAGES = ActionNode(
key="Required Python packages",
expected_type=list[str],
instruction="Provide required Python packages in requirements.txt format.",
example=["flask==1.1.2", "bcrypt==3.2.0"],
)
REQUIRED_OTHER_LANGUAGE_PACKAGES = ActionNode(
key="Required Other language third-party packages",
expected_type=list[str],
instruction="List down the required packages for languages other than Python.",
example=["No third-party dependencies required"],
)
LOGIC_ANALYSIS = ActionNode(
key="Logic Analysis",
expected_type=list[list[str]],
instruction="Provide a list of files with the classes/methods/functions to be implemented, "
"including dependency analysis and imports.",
example=[
["game.py", "Contains Game class and ... functions"],
["main.py", "Contains main function, from game import Game"],
],
)
TASK_LIST = ActionNode(
key="Task list",
expected_type=list[str],
instruction="Break down the tasks into a list of filenames, prioritized by dependency order.",
example=["game.py", "main.py"],
)
FULL_API_SPEC = ActionNode(
key="Full API spec",
expected_type=str,
instruction="Describe all APIs using OpenAPI 3.0 spec that may be used by both frontend and backend. If front-end "
"and back-end communication is not required, leave it blank.",
example="openapi: 3.0.0 ...",
)
SHARED_KNOWLEDGE = ActionNode(
key="Shared Knowledge",
expected_type=str,
instruction="Detail any shared knowledge, like common utility functions or configuration variables.",
example="'game.py' contains functions shared across the project.",
)
ANYTHING_UNCLEAR_PM = ActionNode(
key="Anything UNCLEAR",
expected_type=str,
instruction="Mention any unclear aspects in the project management context and try to clarify them.",
example="Clarification needed on how to start and initialize third-party libraries.",
)
NODES = [
REQUIRED_PYTHON_PACKAGES,
REQUIRED_OTHER_LANGUAGE_PACKAGES,
LOGIC_ANALYSIS,
TASK_LIST,
FULL_API_SPEC,
SHARED_KNOWLEDGE,
ANYTHING_UNCLEAR_PM,
]
PM_NODE = ActionNode.from_children("PM_NODE", NODES)
def main():
prompt = PM_NODE.compile(context="")
logger.info(prompt)
if __name__ == "__main__":
main()

View file

@ -114,7 +114,7 @@ class CollectLinks(Action):
keywords = OutputParser.extract_struct(keywords, list)
keywords = parse_obj_as(list[str], keywords)
except Exception as e:
logger.exception(f'fail to get keywords related to the research topic "{topic}" for {e}')
logger.exception(f"fail to get keywords related to the research topic '{topic}' for {e}")
keywords = [topic]
results = await asyncio.gather(*(self.search_engine.run(i, as_string=False) for i in keywords))

View file

@ -34,13 +34,13 @@ ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. Output format carefully referenc
-----
{code_blocks}
## Code Review All: 请你对历史所有文件进行阅读在文件中找到可能的bug如函数未实现、调用错误、未引用等
## Code Review All: Please read all historical files and find possible bugs in the files, such as unimplemented functions, calling errors, unreferences, etc.
## Call flow: mermaid代码根据实现的函数使用mermaid绘制完整的调用链
## Call flow: mermaid code, based on the implemented function, use mermaid to draw a complete call chain
## Summary: 根据历史文件的实现情况进行总结
## Summary: Summary based on the implementation of historical files
## TODOs: Python dict[str, str],这里写出需要修改的文件列表与理由,我们会在之后进行修改
## TODOs: Python dict[str, str], write down the list of files that need to be modified and the reasons. We will modify them later.
"""
@ -49,9 +49,9 @@ FORMAT_EXAMPLE = """
## Code Review All
### a.py
- 它少实现了xxx需求...
- 字段yyy没有给出...
- ...
- It fulfills less of xxx requirements...
- Field yyy is not given...
-...
### b.py
...

View file

@ -20,8 +20,13 @@ from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.const import CODE_SUMMARIES_FILE_REPO, TEST_OUTPUTS_FILE_REPO, TASK_FILE_REPO, BUGFIX_FILENAME, \
DOCS_FILE_REPO
from metagpt.const import (
BUGFIX_FILENAME,
CODE_SUMMARIES_FILE_REPO,
DOCS_FILE_REPO,
TASK_FILE_REPO,
TEST_OUTPUTS_FILE_REPO,
)
from metagpt.logs import logger
from metagpt.schema import CodingContext, Document, RunCodeResult
from metagpt.utils.common import CodeParser
@ -29,59 +34,52 @@ from metagpt.utils.file_repository import FileRepository
PROMPT_TEMPLATE = """
NOTICE
Role: You are a professional engineer; the main goal is to write PEP8 compliant, elegant, modular, easy to read and maintain Python 3.9 code (but you can also use other programming language)
Role: You are a professional engineer; the main goal is to write google-style, elegant, modular, easy to read and maintain code
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. Output format carefully referenced "Format example".
-----
# Design
```json
# Context
## Design
{design}
```
-----
# Tasks
```json
## Tasks
{tasks}
```
-----
# Legacy Code
```python
## Legacy Code
```Code
{code}
```
-----
# Debug logs
## Debug logs
```text
{logs}
{summary_log}
```
-----
# Bug Feedback logs
## Bug Feedback logs
```text
{feedback}
```
-----
## Code: {filename} Write code with triple quoto, based on the following list and context.
1. Do your best to implement THIS ONLY ONE FILE. ONLY USE EXISTING API. IF NO API, IMPLEMENT IT.
2. Requirement: Based on the context, implement one following code file, note to return only in code form, your code will be part of the entire project, so please implement complete, reliable, reusable code snippets
3. Set default value: If there is any setting, ALWAYS SET A DEFAULT VALUE, ALWAYS USE STRONG TYPE AND EXPLICIT VARIABLE.
4. Follow design: YOU MUST FOLLOW "Data structures and interfaces". DONT CHANGE ANY DESIGN.
5. Think before writing: What should be implemented and provided in this document?
6. CAREFULLY CHECK THAT YOU DONT MISS ANY NECESSARY CLASS/FUNCTION IN THIS FILE.
7. Do not use public member functions that do not exist in your design.
8. Before using a variable, make sure you reference it first
9. Write out EVERY DETAIL, DON'T LEAVE TODO.
## Format example
-----
# Format example
## Code: {filename}
```python
## {filename}
...
```
-----
# Instruction: Based on the context, follow "Format example", write code.
## Code: {filename}. Write code with triple quoto, based on the following attentions and context.
1. Only One file: do your best to implement THIS ONLY ONE FILE.
2. COMPLETE CODE: Your code will be part of the entire project, so please implement complete, reliable, reusable code snippets.
3. Set default value: If there is any setting, ALWAYS SET A DEFAULT VALUE, ALWAYS USE STRONG TYPE AND EXPLICIT VARIABLE. AVOID circular import.
4. Follow design: YOU MUST FOLLOW "Data structures and interfaces". DONT CHANGE ANY DESIGN. Do not use public member functions that do not exist in your design.
5. CAREFULLY CHECK THAT YOU DONT MISS ANY NECESSARY CLASS/FUNCTION IN THIS FILE.
6. Before using a external variable/module, make sure you import it first.
7. Write out EVERY CODE DETAIL, DON'T LEAVE TODO.
"""
@ -102,7 +100,7 @@ class WriteCode(Action):
filename="test_" + coding_context.filename + ".json", relative_path=TEST_OUTPUTS_FILE_REPO
)
summary_doc = None
if coding_context.design_doc.filename:
if coding_context.design_doc and coding_context.design_doc.filename:
summary_doc = await FileRepository.get_file(
filename=coding_context.design_doc.filename, relative_path=CODE_SUMMARIES_FILE_REPO
)
@ -110,9 +108,14 @@ class WriteCode(Action):
if test_doc:
test_detail = RunCodeResult.loads(test_doc.content)
logs = test_detail.stderr
code_context = await self._get_codes(coding_context.task_doc)
if bug_feedback:
code_context = coding_context.code_doc.content
else:
code_context = await self.get_codes(coding_context.task_doc, exclude=self.context.filename)
prompt = PROMPT_TEMPLATE.format(
design=coding_context.design_doc.content,
design=coding_context.design_doc.content if coding_context.design_doc else "",
tasks=coding_context.task_doc.content if coding_context.task_doc else "",
code=code_context,
logs=logs,
@ -128,7 +131,7 @@ class WriteCode(Action):
return coding_context
@staticmethod
async def _get_codes(task_doc) -> str:
async def get_codes(task_doc, exclude) -> str:
if not task_doc:
return ""
if not task_doc.content:
@ -138,9 +141,10 @@ class WriteCode(Action):
codes = []
src_file_repo = CONFIG.git_repo.new_file_repository(relative_path=CONFIG.src_workspace)
for filename in code_filenames:
if filename == exclude:
continue
doc = await src_file_repo.get(filename=filename)
if not doc:
continue
codes.append(doc.content)
return "\n----------\n".join(codes)
codes.append(f"----- {filename}\n" + doc.content)
return "\n".join(codes)

View file

@ -10,6 +10,7 @@
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions import WriteCode
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
@ -17,8 +18,8 @@ from metagpt.schema import CodingContext
from metagpt.utils.common import CodeParser
PROMPT_TEMPLATE = """
NOTICE
Role: You are a professional software engineer, and your main task is to review the code. You need to ensure that the code conforms to the PEP8 standards, is elegantly designed and modularized, easy to read and maintain, and is written in Python 3.9 (or in another programming language).
# System
Role: You are a professional software engineer, and your main task is to review and revise the code. You need to ensure that the code conforms to the google-style standards, is elegantly designed and modularized, easy to read and maintain.
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. Output format carefully referenced "Format example".
@ -26,53 +27,72 @@ ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. Output format carefully referenc
{context}
## Code to be Reviewed: {filename}
```
```Code
{code}
```
"""
-----
## Code Review: Based on the "Code to be Reviewed", provide key, clear, concise, and specific code modification suggestions, up to 5.
EXAMPLE_AND_INSTRUCTION = """
{format_example}
# Instruction: Based on the actual code situation, follow one of the "Format example".
## Code Review: Ordered List. Based on the "Code to be Reviewed", provide key, clear, concise, and specific answer. If any answer is no, explain how to fix it step by step.
1. Is the code implemented as per the requirements? If not, how to achieve it? Analyse it step by step.
2. Is the code logic completely correct? If there are errors, please indicate how to correct them.
3. Does the existing code follow the "Data structures and interfaces"?
4. Are all functions implemented? If there is no implementation, please indicate how to achieve it step by step.
5. Have all necessary pre-dependencies been imported? If not, indicate which ones need to be imported
6. Is the code implemented concisely enough? Are methods from other files being reused correctly?
6. Are methods from other files being reused correctly?
## Code Review Result: If the code doesn't have bugs, we don't need to rewrite it, so answer LGTM and stop. ONLY ANSWER LGTM/LBTM.
## Actions: Ordered List. Things that should be done after CR, such as implementing class A and function B
## Code Review Result: str. If the code doesn't have bugs, we don't need to rewrite it, so answer LGTM and stop. ONLY ANSWER LGTM/LBTM.
LGTM/LBTM
## Rewrite Code: if it still has some bugs, rewrite {filename} based on "Code Review" with triple quotes, try to get LGTM. Do your utmost to optimize THIS SINGLE FILE. Implement ALL TODO. RETURN ALL CODE, NEVER OMIT ANYTHING. 以任何方式省略代码都是不允许的。
```
```
## Format example
{format_example}
"""
FORMAT_EXAMPLE = """
-----
# EXAMPLE 1
# Format example 1
## Code Review: {filename}
1. No, we should add the logic of ...
1. No, we should fix the logic of class A due to ...
2. ...
3. ...
4. ...
4. No, function B is not implemented, ...
5. ...
6. ...
## Code Review Result: {filename}
## Actions
1. Fix the `handle_events` method to update the game state only if a move is successful.
```python
def handle_events(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
return False
if event.type == pygame.KEYDOWN:
moved = False
if event.key == pygame.K_UP:
moved = self.game.move('UP')
elif event.key == pygame.K_DOWN:
moved = self.game.move('DOWN')
elif event.key == pygame.K_LEFT:
moved = self.game.move('LEFT')
elif event.key == pygame.K_RIGHT:
moved = self.game.move('RIGHT')
if moved:
# Update the game state only if a move was successful
self.render()
return True
```
2. Implement function B
## Code Review Result
LBTM
## Rewrite Code: {filename}
```python
## {filename}
...
```
-----
# EXAMPLE 2
# Format example 2
## Code Review: {filename}
1. Yes.
2. Yes.
@ -81,12 +101,20 @@ LBTM
5. Yes.
6. Yes.
## Code Review Result: {filename}
LGTM
## Rewrite Code: {filename}
## Actions
pass
-----
## Code Review Result
LGTM
"""
REWRITE_CODE_TEMPLATE = """
# Instruction: rewrite code based on the Code Review and Actions
## Rewrite Code: CodeBlock. If it still has some bugs, rewrite {filename} with triple quotes. Do your utmost to optimize THIS SINGLE FILE. Return all completed codes and prohibit the return of unfinished codes.
```Code
## {filename}
...
```
"""
@ -95,11 +123,15 @@ class WriteCodeReview(Action):
super().__init__(name, context, llm)
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
async def write_code_review_and_rewrite(self, prompt):
code_rsp = await self._aask(prompt)
result = CodeParser.parse_block("Code Review Result", code_rsp)
async def write_code_review_and_rewrite(self, context_prompt, cr_prompt, filename):
cr_rsp = await self._aask(context_prompt + cr_prompt)
result = CodeParser.parse_block("Code Review Result", cr_rsp)
if "LGTM" in result:
return result, None
# if LBTM, rewrite code
rewrite_prompt = f"{context_prompt}\n{cr_rsp}\n{REWRITE_CODE_TEMPLATE.format(filename=filename)}"
code_rsp = await self._aask(rewrite_prompt)
code = CodeParser.parse_code(block="", text=code_rsp)
return result, code
@ -109,23 +141,24 @@ class WriteCodeReview(Action):
for i in range(k):
format_example = FORMAT_EXAMPLE.format(filename=self.context.code_doc.filename)
task_content = self.context.task_doc.content if self.context.task_doc else ""
context = "\n----------\n".join(
code_context = await WriteCode.get_codes(self.context.task_doc, exclude=self.context.filename)
context = "\n".join(
[
"```text\n" + self.context.design_doc.content + "```\n",
"```text\n" + task_content + "```\n",
"```python\n" + self.context.code_doc.content + "```\n",
"## System Design\n" + str(self.context.design_doc) + "\n",
"## Tasks\n" + task_content + "\n",
"## Code Files\n" + code_context + "\n",
]
)
prompt = PROMPT_TEMPLATE.format(
context_prompt = PROMPT_TEMPLATE.format(
context=context,
code=iterative_code,
filename=self.context.code_doc.filename,
format_example=format_example,
)
cr_prompt = EXAMPLE_AND_INSTRUCTION.format(format_example=format_example, )
logger.info(
f"Code review and rewrite {self.context.code_doc.filename,}: {i+1}/{k} | {len(iterative_code)=}, {len(self.context.code_doc.content)=}"
f"Code review and rewrite {self.context.code_doc.filename}: {i+1}/{k} | {len(iterative_code)=}, {len(self.context.code_doc.content)=}"
)
result, rewrited_code = await self.write_code_review_and_rewrite(prompt)
result, rewrited_code = await self.write_code_review_and_rewrite(context_prompt, cr_prompt, self.context.code_doc.filename)
if "LBTM" in result:
iterative_code = rewrited_code
elif "LGTM" in result:

View file

@ -14,310 +14,52 @@ from __future__ import annotations
import json
from pathlib import Path
from typing import List
from metagpt.actions import Action, ActionOutput
from metagpt.actions.action_node import ActionNode
from metagpt.actions.fix_bug import FixBug
from metagpt.actions.search_and_summarize import SearchAndSummarize
from metagpt.actions.write_prd_an import (
WP_IS_RELATIVE_NODE,
WP_ISSUE_TYPE_NODE,
WRITE_PRD_NODE,
)
from metagpt.config import CONFIG
from metagpt.const import (
BUGFIX_FILENAME,
COMPETITIVE_ANALYSIS_FILE_REPO,
DOCS_FILE_REPO,
PRD_PDF_FILE_REPO,
PRDS_FILE_REPO,
REQUIREMENT_FILENAME, BUGFIX_FILENAME,
REQUIREMENT_FILENAME,
)
from metagpt.logs import logger
from metagpt.schema import Document, Documents, Message, BugFixContext
from metagpt.schema import BugFixContext, Document, Documents, Message
from metagpt.utils.common import CodeParser
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.get_template import get_template
# from metagpt.utils.get_template import get_template
from metagpt.utils.mermaid import mermaid_to_file
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
{{
"Original Requirements": "{requirements}",
"Search Information": ""
}}
# from typing import List
## Format example
{format_example}
-----
Role: You are a professional product manager; the goal is to design a concise, usable, efficient product
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirements: According to the context, fill in the following missing information, note that each sections are returned in Python code triple quote form seperatedly.
ATTENTION: Output carefully referenced "Format example" in format.
## YOU NEED TO FULFILL THE BELOW JSON DOC
CONTEXT_TEMPLATE = """
### Project Name
{project_name}
{{
"Language": "", # str, use the same language as the user requirement. en_us / zh_cn etc.
"Original Requirements": "", # str, place the polished complete original requirements here
"Project Name": "{project_name}", # str, if it's empty, name it with snake case style, like game_2048 / web_2048 / simple_crm etc.
"Search Information": "",
"Requirements": "",
"Product Goals": [], # Provided as Python list[str], up to 3 clear, orthogonal product goals.
"User Stories": [], # Provided as Python list[str], up to 5 scenario-based user stories
"Competitive Analysis": [], # Provided as Python list[str], up to 8 competitive product analyses
# Use mermaid quadrantChart code syntax. up to 14 competitive products. Translation: Distribute these competitor scores evenly between 0 and 1, trying to conform to a normal distribution centered around 0.5 as much as possible.
"Competitive Quadrant Chart": "quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
Campaign A: [0.3, 0.6]
Campaign B: [0.45, 0.23]
Campaign C: [0.57, 0.69]
Campaign D: [0.78, 0.34]
Campaign E: [0.40, 0.34]
Campaign F: [0.35, 0.78]",
"Requirement Analysis": "", # Provide as Plain text.
"Requirement Pool": [["P0","P0 requirement"],["P1","P1 requirement"]], # Provided as Python list[list[str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards
"UI Design draft": "", # Provide as Plain text. Be simple. Describe the elements and functions, also provide a simple style description and layout description.
"Anything UNCLEAR": "", # Provide as Plain text. Try to clarify it.
}}
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": """
[CONTENT]
{{
"Language": "",
"Original Requirements": "",
"Project Name": "{project_name}",
"Search Information": "",
"Requirements": "",
"Product Goals": [],
"User Stories": [],
"Competitive Analysis": [],
"Competitive Quadrant Chart": "quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
Campaign A: [0.3, 0.6]
Campaign B: [0.45, 0.23]
Campaign C: [0.57, 0.69]
Campaign D: [0.78, 0.34]
Campaign E: [0.40, 0.34]
Campaign F: [0.35, 0.78]",
"Requirement Analysis": "",
"Requirement Pool": [["P0","P0 requirement"],["P1","P1 requirement"]],
"UI Design draft": "",
"Anything UNCLEAR": "",
}}
[/CONTENT]
""",
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
## Original Requirements
### Original Requirements
{requirements}
## Search Information
{search_information}
## mermaid quadrantChart code syntax example. DONT USE QUOTO IN CODE DUE TO INVALID SYNTAX. Replace the <Campain X> with REAL COMPETITOR NAME
```mermaid
quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
"Campaign: A": [0.3, 0.6]
"Campaign B": [0.45, 0.23]
"Campaign C": [0.57, 0.69]
"Campaign D": [0.78, 0.34]
"Campaign E": [0.40, 0.34]
"Campaign F": [0.35, 0.78]
"Our Target Product": [0.5, 0.6]
```
## Format example
{format_example}
-----
Role: You are a professional product manager; the goal is to design a concise, usable, efficient product
Language: Please use the same language as the user requirement to answer, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirements: According to the context, fill in the following missing information, note that each sections are returned in Python code triple quote form seperatedly.
ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. AND '## <SECTION_NAME>' SHOULD WRITE BEFORE the code and triple quote. Output carefully referenced "Format example" in format.
## Language: Provide as Plain text, use the same language as the user requirement.
## Original Requirements: Provide as Plain text, place the polished complete original requirements here
## Product Goals: Provided as Python list[str], up to 3 clear, orthogonal product goals.
## User Stories: Provided as Python list[str], up to 5 scenario-based user stories
## Competitive Analysis: Provided as Python list[str], up to 7 competitive product analyses, consider as similar competitors as possible
## Competitive Quadrant Chart: Use mermaid quadrantChart code syntax. up to 14 competitive products. Translation: Distribute these competitor scores evenly between 0 and 1, trying to conform to a normal distribution centered around 0.5 as much as possible.
## Requirement Analysis: Provide as Plain text.
## Requirement Pool: Provided as Python list[list[str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards
## UI Design draft: Provide as Plain text. Be simple. Describe the elements and functions, also provide a simple style description and layout description.
## Anything UNCLEAR: Provide as Plain text. Try to clarify it.
""",
"FORMAT_EXAMPLE": """
---
## Original Requirements
The user ...
## Product Goals
```python
[
"Create a ...",
]
```
## User Stories
```python
[
"As a user, ...",
]
```
## Competitive Analysis
```python
[
"Python Snake Game: ...",
]
```
## Competitive Quadrant Chart
```mermaid
quadrantChart
title Reach and engagement of campaigns
...
"Our Target Product": [0.6, 0.7]
```
## Requirement Analysis
The product should be a ...
## Requirement Pool
```python
[
["End game ...", "P0"]
]
```
## UI Design draft
Give a basic function description, and a draft
## Anything UNCLEAR
There are no unclear points.
---
""",
},
}
OUTPUT_MAPPING = {
"Language": (str, ...),
"Original Requirements": (str, ...),
"Project Name": (str, ...),
"Product Goals": (List[str], ...),
"User Stories": (List[str], ...),
"Competitive Analysis": (List[str], ...),
"Competitive Quadrant Chart": (str, ...),
"Requirement Analysis": (str, ...),
"Requirement Pool": (List[List[str]], ...),
"UI Design draft": (str, ...),
"Anything UNCLEAR": (str, ...),
}
IS_RELATIVE_PROMPT = """
## PRD:
{old_prd}
## New Requirement:
{requirements}
___
You are a professional product manager; You need to assess whether the new requirements are relevant to the existing PRD to determine whether to merge the new requirements into this PRD.
Is the newly added requirement in "New Requirement" related to the PRD?
Respond with `YES` if it is related, `NO` if it is not, and provide the reasons. Return the response in JSON format.
### Search Information
-
"""
MERGE_PROMPT = """
# Context
## Original Requirements
{requirements}
## Old PRD
NEW_REQ_TEMPLATE = """
### Legacy Content
{old_prd}
-----
Role: You are a professional product manager; The goal is to incorporate the newly added requirements from the "Original Requirements" into the existing Product Requirements Document (PRD) in the "Old PRD" in order to design a concise, usable, and efficient product.
Language: Please use the same language as the user requirement, but the title and code should be still in English. For example, if the user speaks Chinese, the specific text of your answer should also be in Chinese.
Requirements: According to the context, fill in the following missing information, each section name is a key in json ,If the requirements are unclear, ensure minimum viability and avoid excessive design
ATTENTION: Output carefully referenced "Old PRD" in format.
## YOU NEED TO FULFILL THE BELOW JSON DOC
{{
"Language": "", # str, use the same language as the user requirement. en_us / zh_cn etc.
"Original Requirements": "", # str, place the polished complete original requirements here
"Project Name": "{project_name}", # str, if it's empty, name it with snake case style, like game_2048 / web_2048 / simple_crm etc.
"Search Information": "",
"Requirements": "",
"Product Goals": [], # Provided as Python list[str], up to 3 clear, orthogonal product goals.
"User Stories": [], # Provided as Python list[str], up to 5 scenario-based user stories
"Competitive Analysis": [], # Provided as Python list[str], up to 8 competitive product analyses
# Use mermaid quadrantChart code syntax. up to 14 competitive products. Translation: Distribute these competitor scores evenly between 0 and 1, trying to conform to a normal distribution centered around 0.5 as much as possible.
"Competitive Quadrant Chart": "quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
Campaign A: [0.3, 0.6]
Campaign B: [0.45, 0.23]
Campaign C: [0.57, 0.69]
Campaign D: [0.78, 0.34]
Campaign E: [0.40, 0.34]
Campaign F: [0.35, 0.78]",
"Requirement Analysis": "", # Provide as Plain text.
"Requirement Pool": [["P0","P0 requirement"],["P1","P1 requirement"]], # Provided as Python list[list[str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards
"UI Design draft": "", # Provide as Plain text. Be simple. Describe the elements and functions, also provide a simple style description and layout description.
"Anything UNCLEAR": "", # Provide as Plain text. Try to clarify it.
}}
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like "Old PRD" format,
and only output the json inside this tag, nothing else
"""
IS_BUGFIX_PROMPT = """
{content}
___
You are a professional product manager; You need to determine whether the above content describes a requirement or provides feedback about a bug.
Respond with `YES` if it is a feedback about a bug, `NO` if it is not, and provide the reasons. Return the response in JSON format like below:
```json
{{
"is_bugfix": ..., # `YES` or `NO`
"reason": ..., # reason string
}}
```
### New Requirements
{requirements}
"""
@ -334,12 +76,14 @@ class WritePRD(Action):
await docs_file_repo.save(filename=BUGFIX_FILENAME, content=requirement_doc.content)
await docs_file_repo.save(filename=REQUIREMENT_FILENAME, content="")
bug_fix = BugFixContext(filename=BUGFIX_FILENAME)
return Message(content=bug_fix.json(), instruct_content=bug_fix,
role=self.profile,
cause_by=FixBug,
sent_from=self,
send_to="Alex", # the name of Engineer
)
return Message(
content=bug_fix.json(),
instruct_content=bug_fix,
role="",
cause_by=FixBug,
sent_from=self,
send_to="Alex", # the name of Engineer
)
else:
await docs_file_repo.delete(filename=BUGFIX_FILENAME)
@ -353,7 +97,7 @@ class WritePRD(Action):
if not prd_doc:
continue
change_files.docs[prd_doc.filename] = prd_doc
logger.info(f"REWRITE PRD:{prd_doc.filename}")
logger.info(f"rewrite prd: {prd_doc.filename}")
# If there is no existing PRD, generate one using 'docs/requirement.txt'.
if not change_files.docs:
prd_doc = await self._update_prd(
@ -361,53 +105,38 @@ class WritePRD(Action):
)
if prd_doc:
change_files.docs[prd_doc.filename] = prd_doc
logger.info(f"NEW PRD:{prd_doc.filename}")
logger.debug(f"new prd: {prd_doc.filename}")
# Once all files under 'docs/prds/' have been compared with the newly added requirements, trigger the
# 'publish' message to transition the workflow to the next stage. This design allows room for global
# optimization in subsequent steps.
return ActionOutput(content=change_files.json(), instruct_content=change_files)
async def _run_new_requirement(self, requirements, format=CONFIG.prompt_format, *args, **kwargs) -> ActionOutput:
sas = SearchAndSummarize()
# rsp = await sas.run(context=requirements, system_text=SEARCH_AND_SUMMARIZE_SYSTEM_EN_US)
rsp = ""
info = f"### Search Results\n{sas.result}\n\n### Search Summary\n{rsp}"
if sas.result:
logger.info(sas.result)
logger.info(rsp)
# logger.info(format)
prompt_template, format_example = get_template(templates, format)
async def _run_new_requirement(self, requirements, format=CONFIG.prompt_format) -> ActionOutput:
# sas = SearchAndSummarize()
# # rsp = await sas.run(context=requirements, system_text=SEARCH_AND_SUMMARIZE_SYSTEM_EN_US)
# rsp = ""
# info = f"### Search Results\n{sas.result}\n\n### Search Summary\n{rsp}"
# if sas.result:
# logger.info(sas.result)
# logger.info(rsp)
project_name = CONFIG.project_name if CONFIG.project_name else ""
format_example = format_example.format(project_name=project_name)
# logger.info(prompt_template)
# logger.info(format_example)
prompt = prompt_template.format(
requirements=requirements, search_information=info, format_example=format_example, project_name=project_name
)
# logger.info(prompt)
# prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING)
prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING, format=format)
await self._rename_workspace(prd)
return prd
context = CONTEXT_TEMPLATE.format(requirements=requirements, project_name=project_name)
node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm, to=format)
await self._rename_workspace(node)
return node
async def _is_relative_to(self, new_requirement_doc, old_prd_doc) -> bool:
prompt = IS_RELATIVE_PROMPT.format(old_prd=old_prd_doc.content, requirements=new_requirement_doc.content)
res = await self._aask(prompt=prompt)
logger.info(f"REQ-RELATIVE:[{new_requirement_doc.root_relative_path}, {old_prd_doc.root_relative_path}]: {res}")
if "YES" in res:
return True
return False
async def _is_relative(self, new_requirement_doc, old_prd_doc) -> bool:
context = NEW_REQ_TEMPLATE.format(old_prd=old_prd_doc.content, requirements=new_requirement_doc.content)
node = await WP_IS_RELATIVE_NODE.fill(context, self.llm)
return node.get("is_relative") == "YES"
async def _merge(self, new_requirement_doc, prd_doc, format=CONFIG.prompt_format) -> Document:
if not CONFIG.project_name:
CONFIG.project_name = Path(CONFIG.project_path).name
prompt = MERGE_PROMPT.format(
requirements=new_requirement_doc.content, old_prd=prd_doc.content, project_name=CONFIG.project_name
)
prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING, format=format)
prd_doc.content = prd.instruct_content.json(ensure_ascii=False)
await self._rename_workspace(prd)
prompt = NEW_REQ_TEMPLATE.format(requirements=new_requirement_doc.content, old_prd=prd_doc.content)
node = await WRITE_PRD_NODE.fill(context=prompt, llm=self.llm, to=format)
prd_doc.content = node.instruct_content.json(ensure_ascii=False)
await self._rename_workspace(node)
return prd_doc
async def _update_prd(self, requirement_doc, prd_doc, prds_file_repo, *args, **kwargs) -> Document | None:
@ -418,7 +147,7 @@ class WritePRD(Action):
filename=FileRepository.new_filename() + ".json",
content=prd.instruct_content.json(ensure_ascii=False),
)
elif await self._is_relative_to(requirement_doc, prd_doc):
elif await self._is_relative(requirement_doc, prd_doc):
new_prd_doc = await self._merge(requirement_doc, prd_doc)
else:
return None
@ -434,7 +163,7 @@ class WritePRD(Action):
if not quadrant_chart:
return
pathname = (
CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix("")
CONFIG.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix("")
)
if not pathname.parent.exists():
pathname.parent.mkdir(parents=True, exist_ok=True)
@ -453,17 +182,17 @@ class WritePRD(Action):
return
if not CONFIG.project_name:
if isinstance(prd, ActionOutput):
if isinstance(prd, ActionOutput) or isinstance(prd, ActionNode):
ws_name = prd.instruct_content.dict()["Project Name"]
else:
ws_name = CodeParser.parse_str(block="Project Name", text=prd)
CONFIG.project_name = ws_name
CONFIG.git_repo.rename_root(CONFIG.project_name)
async def _is_bugfix(self, content):
prompt = IS_BUGFIX_PROMPT.format(content=content)
res = await self._aask(prompt=prompt)
logger.info(f"IS_BUGFIX:{res}")
if "YES" in res:
return True
return False
async def _is_bugfix(self, context) -> bool:
src_workspace_path = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name
code_files = CONFIG.git_repo.get_files(relative_path=src_workspace_path)
if not code_files:
return False
node = await WP_ISSUE_TYPE_NODE.fill(context, self.llm)
return node.get("issue_type") == "BUG"

View file

@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/14 11:40
@Author : alexanderwu
@File : write_prd_an.py
"""
from metagpt.actions.action_node import ActionNode
from metagpt.logs import logger
LANGUAGE = ActionNode(
key="Language",
expected_type=str,
instruction="Provide the language used in the project, typically matching the user's requirement language.",
example="en_us",
)
PROGRAMMING_LANGUAGE = ActionNode(
key="Programming Language",
expected_type=str,
instruction="Python/JavaScript or other mainstream programming language.",
example="Python",
)
ORIGINAL_REQUIREMENTS = ActionNode(
key="Original Requirements",
expected_type=str,
instruction="Place the polished, complete original requirements here.",
example="The game should have a leaderboard and multiple difficulty levels.",
)
PROJECT_NAME = ActionNode(
key="Project Name",
expected_type=str,
instruction="Name the project using snake case style, like 'game_2048' or 'simple_crm'.",
example="game_2048",
)
PRODUCT_GOALS = ActionNode(
key="Product Goals",
expected_type=list[str],
instruction="Provide up to three clear, orthogonal product goals.",
example=["Create an engaging user experience", "Ensure high performance", "Provide customizable features"],
)
USER_STORIES = ActionNode(
key="User Stories",
expected_type=list[str],
instruction="Provide up to five scenario-based user stories.",
example=[
"As a user, I want to be able to choose difficulty levels",
"As a player, I want to see my score after each game",
],
)
COMPETITIVE_ANALYSIS = ActionNode(
key="Competitive Analysis",
expected_type=list[str],
instruction="Provide analyses for up to seven competitive products.",
example=["Python Snake Game: Simple interface, lacks advanced features"],
)
COMPETITIVE_QUADRANT_CHART = ActionNode(
key="Competitive Quadrant Chart",
expected_type=str,
instruction="Use mermaid quadrantChart syntax. Distribute scores evenly between 0 and 1",
example="""quadrantChart
title "Reach and engagement of campaigns"
x-axis "Low Reach" --> "High Reach"
y-axis "Low Engagement" --> "High Engagement"
quadrant-1 "We should expand"
quadrant-2 "Need to promote"
quadrant-3 "Re-evaluate"
quadrant-4 "May be improved"
"Campaign A": [0.3, 0.6]
"Campaign B": [0.45, 0.23]
"Campaign C": [0.57, 0.69]
"Campaign D": [0.78, 0.34]
"Campaign E": [0.40, 0.34]
"Campaign F": [0.35, 0.78]
"Our Target Product": [0.5, 0.6]""",
)
REQUIREMENT_ANALYSIS = ActionNode(
key="Requirement Analysis",
expected_type=str,
instruction="Provide a detailed analysis of the requirements.",
example="The product should be user-friendly.",
)
REQUIREMENT_POOL = ActionNode(
key="Requirement Pool",
expected_type=list[list[str]],
instruction="List down the requirements with their priority (P0, P1, P2).",
example=[["P0", "..."], ["P1", "..."]],
)
UI_DESIGN_DRAFT = ActionNode(
key="UI Design draft",
expected_type=str,
instruction="Provide a simple description of UI elements, functions, style, and layout.",
example="Basic function description with a simple style and layout.",
)
ANYTHING_UNCLEAR = ActionNode(
key="Anything UNCLEAR",
expected_type=str,
instruction="Mention any aspects of the project that are unclear and try to clarify them.",
example="...",
)
ISSUE_TYPE = ActionNode(
key="issue_type",
expected_type=str,
instruction="Answer BUG/REQUIREMENT. If it is a bugfix, answer BUG, otherwise answer Requirement",
example="BUG",
)
IS_RELATIVE = ActionNode(
key="is_relative",
expected_type=str,
instruction="Answer YES/NO. If the requirement is related to the old PRD, answer YES, otherwise NO",
example="YES",
)
REASON = ActionNode(
key="reason", expected_type=str, instruction="Explain the reasoning process from question to answer", example="..."
)
NODES = [
LANGUAGE,
PROGRAMMING_LANGUAGE,
ORIGINAL_REQUIREMENTS,
PROJECT_NAME,
PRODUCT_GOALS,
USER_STORIES,
COMPETITIVE_ANALYSIS,
COMPETITIVE_QUADRANT_CHART,
REQUIREMENT_ANALYSIS,
REQUIREMENT_POOL,
UI_DESIGN_DRAFT,
ANYTHING_UNCLEAR,
]
WRITE_PRD_NODE = ActionNode.from_children("WritePRD", NODES)
WP_ISSUE_TYPE_NODE = ActionNode.from_children("WP_ISSUE_TYPE", [ISSUE_TYPE, REASON])
WP_IS_RELATIVE_NODE = ActionNode.from_children("WP_IS_RELATIVE", [IS_RELATIVE, REASON])
def main():
prompt = WRITE_PRD_NODE.compile(context="")
logger.info(prompt)
if __name__ == "__main__":
main()

View file

@ -31,7 +31,7 @@ Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD W
```
Note that the code to test is at {source_file_path}, we will put your test code at {workspace}/tests/{test_file_name}, and run your test code from {workspace},
you should correctly import the necessary classes based on these file locations!
## {test_file_name}: Write test code with triple quoto. Do your best to implement THIS ONLY ONE FILE.
## {test_file_name}: Write test code with triple quote. Do your best to implement THIS ONLY ONE FILE.
"""

View file

@ -46,7 +46,7 @@ class Config(metaclass=Singleton):
def __init__(self, yaml_file=default_yaml_file):
self._init_with_config_files_and_env(yaml_file)
logger.info("Config loading done.")
logger.debug("Config loading done.")
self._update()
def _update(self):
@ -55,12 +55,20 @@ class Config(metaclass=Singleton):
self.openai_api_key = self._get("OPENAI_API_KEY")
self.anthropic_api_key = self._get("Anthropic_API_KEY")
self.zhipuai_api_key = self._get("ZHIPUAI_API_KEY")
self.open_llm_api_base = self._get("OPEN_LLM_API_BASE")
self.open_llm_api_model = self._get("OPEN_LLM_API_MODEL")
self.fireworks_api_key = self._get("FIREWORKS_API_KEY")
if (
(not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key)
and (not self.anthropic_api_key or "YOUR_API_KEY" == self.anthropic_api_key)
and (not self.zhipuai_api_key or "YOUR_API_KEY" == self.zhipuai_api_key)
and (not self.open_llm_api_base)
and (not self.fireworks_api_key or "YOUR_API_KEY" == self.fireworks_api_key)
):
raise NotConfiguredException("Set OPENAI_API_KEY or Anthropic_API_KEY or ZHIPUAI_API_KEY first")
raise NotConfiguredException(
"Set OPENAI_API_KEY or Anthropic_API_KEY or ZHIPUAI_API_KEY first "
"or FIREWORKS_API_KEY or OPEN_LLM_API_BASE"
)
self.openai_api_base = self._get("OPENAI_API_BASE")
self.openai_proxy = self._get("OPENAI_PROXY") or self.global_proxy
self.openai_api_type = self._get("OPENAI_API_TYPE")
@ -77,6 +85,9 @@ class Config(metaclass=Singleton):
self.domain = self._get("DOMAIN")
self.spark_url = self._get("SPARK_URL")
self.fireworks_api_base = self._get("FIREWORKS_API_BASE")
self.fireworks_api_model = self._get("FIREWORKS_API_MODEL")
self.claude_api_key = self._get("Anthropic_API_KEY")
self.serpapi_api_key = self._get("SERPAPI_API_KEY")
self.serper_api_key = self._get("SERPER_API_KEY")
@ -102,13 +113,14 @@ class Config(metaclass=Singleton):
self.mermaid_engine = self._get("MERMAID_ENGINE", "nodejs")
self.pyppeteer_executable_path = self._get("PYPPETEER_EXECUTABLE_PATH", "")
self.prompt_format = self._get("PROMPT_FORMAT", "markdown")
self.repair_llm_output = self._get("REPAIR_LLM_OUTPUT", False)
self.prompt_format = self._get("PROMPT_FORMAT", "json")
self.workspace_path = Path(self._get("WORKSPACE_PATH", DEFAULT_WORKSPACE_ROOT))
self._ensure_workspace_exists()
def _ensure_workspace_exists(self):
self.workspace_path.mkdir(parents=True, exist_ok=True)
logger.info(f"WORKSPACE_PATH set to {self.workspace_path}")
logger.debug(f"WORKSPACE_PATH set to {self.workspace_path}")
def _init_with_config_files_and_env(self, yaml_file):
"""Load from config/key.yaml, config/config.yaml, and env in decreasing order of priority"""

View file

@ -4,6 +4,7 @@
@Time : 2023/6/8 14:03
@Author : alexanderwu
@File : document.py
@Desc : Classes and Operations Related to Files in the File System.
"""
from enum import Enum
from pathlib import Path

View file

@ -4,6 +4,7 @@
@Time : 2023/6/8 14:03
@Author : alexanderwu
@File : document.py
@Desc : Classes and Operations Related to Vector Files in the Vector Database. Still under design.
"""
from pathlib import Path

View file

@ -58,7 +58,7 @@ class Environment(BaseModel):
route the message to the message recipient is a problem addressed by the transport framework designed
in RFC 113.
"""
logger.info(f"publish_message: {message.dump()}")
logger.debug(f"publish_message: {message.dump()}")
found = False
# According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
for role, subscription in self.members.items():
@ -82,7 +82,7 @@ class Environment(BaseModel):
futures.append(future)
await asyncio.gather(*futures)
logger.info(f"is idle: {self.is_idle}")
logger.debug(f"is idle: {self.is_idle}")
def get_roles(self) -> dict[str, Role]:
"""获得环境内的所有角色

View file

@ -7,26 +7,30 @@
"""
from metagpt.config import CONFIG
from metagpt.provider.anthropic_api import Claude2 as Claude
from metagpt.provider.openai_api import OpenAIGPTAPI
from metagpt.provider.zhipuai_api import ZhiPuAIGPTAPI
from metagpt.provider.spark_api import SparkAPI
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.provider.fireworks_api import FireWorksGPTAPI
from metagpt.provider.human_provider import HumanProvider
from metagpt.provider.open_llm_api import OpenLLMGPTAPI
from metagpt.provider.openai_api import OpenAIGPTAPI
from metagpt.provider.spark_api import SparkAPI
from metagpt.provider.zhipuai_api import ZhiPuAIGPTAPI
_ = HumanProvider() # Avoid pre-commit error
def LLM() -> "BaseGPTAPI":
""" initialize different LLM instance according to the key field existence"""
def LLM() -> BaseGPTAPI:
"""initialize different LLM instance according to the key field existence"""
# TODO a little trick, can use registry to initialize LLM instance further
if CONFIG.openai_api_key:
llm = OpenAIGPTAPI()
elif CONFIG.claude_api_key:
llm = Claude()
elif CONFIG.spark_api_key:
llm = SparkAPI()
elif CONFIG.zhipuai_api_key:
llm = ZhiPuAIGPTAPI()
elif CONFIG.open_llm_api_base:
llm = OpenLLMGPTAPI()
elif CONFIG.fireworks_api_key:
llm = FireWorksGPTAPI()
else:
raise RuntimeError("You should config a LLM configuration first")

View file

@ -7,6 +7,7 @@
"""
import sys
from datetime import datetime
from loguru import logger as _logger
@ -15,9 +16,12 @@ from metagpt.const import METAGPT_ROOT
def define_log_level(print_level="INFO", logfile_level="DEBUG"):
"""Adjust the log level to above level"""
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
_logger.remove()
_logger.add(sys.stderr, level=print_level)
_logger.add(METAGPT_ROOT / "logs/log.txt", level=logfile_level)
_logger.add(METAGPT_ROOT / f"logs/{formatted_date}.txt", level=logfile_level)
return _logger

View file

@ -10,7 +10,7 @@
from typing import Optional
from abc import ABC
from metagpt.llm import LLM # Large language model, similar to GPT
n
class Action(ABC):
def __init__(self, name='', context=None, llm: LLM = LLM()):
self.name = name

View file

@ -9,7 +9,6 @@ import json
from abc import abstractmethod
from typing import Optional
from metagpt.logs import logger
from metagpt.provider.base_chatbot import BaseChatbot
@ -55,7 +54,6 @@ class BaseGPTAPI(BaseChatbot):
message.extend(format_msgs)
message.append(self._user_msg(msg))
rsp = await self.acompletion_text(message, stream=stream, generator=generator, timeout=timeout)
logger.debug(message)
# logger.debug(rsp)
return rsp

View file

@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : fireworks.ai's api
import openai
from metagpt.config import CONFIG
from metagpt.provider.openai_api import CostManager, OpenAIGPTAPI, RateLimiter
class FireWorksGPTAPI(OpenAIGPTAPI):
def __init__(self):
self.__init_fireworks(CONFIG)
self.llm = openai
self.model = CONFIG.fireworks_api_model
self.auto_max_tokens = False
self._cost_manager = CostManager()
RateLimiter.__init__(self, rpm=self.rpm)
def __init_fireworks(self, config: "Config"):
openai.api_key = config.fireworks_api_key
openai.api_base = config.fireworks_api_base
self.rpm = int(config.get("RPM", 10))

View file

@ -0,0 +1,46 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : self-host open llm model with openai-compatible interface
import openai
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.provider.openai_api import CostManager, OpenAIGPTAPI, RateLimiter
class OpenLLMCostManager(CostManager):
"""open llm model is self-host, it's free and without cost"""
def update_cost(self, prompt_tokens, completion_tokens, model):
"""
Update the total cost, prompt tokens, and completion tokens.
Args:
prompt_tokens (int): The number of tokens used in the prompt.
completion_tokens (int): The number of tokens used in the completion.
model (str): The model used for the API call.
"""
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
logger.info(
f"Max budget: ${CONFIG.max_budget:.3f} | "
f"prompt_tokens: {prompt_tokens}, completion_tokens: {completion_tokens}"
)
CONFIG.total_cost = self.total_cost
class OpenLLMGPTAPI(OpenAIGPTAPI):
def __init__(self):
self.__init_openllm(CONFIG)
self.llm = openai
self.model = CONFIG.open_llm_api_model
self.auto_max_tokens = False
self._cost_manager = OpenLLMCostManager()
RateLimiter.__init__(self, rpm=self.rpm)
def __init_openllm(self, config: "Config"):
openai.api_key = "sk-xx" # self-host api doesn't need api-key, use the default value
openai.api_base = config.open_llm_api_base
self.rpm = int(config.get("RPM", 10))

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,70 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : base llm postprocess plugin to do the operations like repair the raw llm output
from typing import Union
from metagpt.logs import logger
from metagpt.utils.repair_llm_raw_output import (
RepairType,
extract_content_from_output,
repair_llm_raw_output,
retry_parse_json_text,
)
class BasePostPrecessPlugin(object):
model = None # the plugin of the `model`, use to judge in `llm_postprecess`
def run_repair_llm_output(self, output: str, schema: dict, req_key: str = "[/CONTENT]") -> Union[dict, list]:
"""
repair steps
1. repair the case sensitive problem using the schema's fields
2. extract the content from the req_key pair( xx[REQ_KEY]xxx[/REQ_KEY]xx )
3. repair the invalid json text in the content
4. parse the json text and repair it according to the exception with retry loop
"""
output_class_fields = list(schema["properties"].keys()) # Custom ActionOutput's fields
content = self.run_repair_llm_raw_output(output, req_keys=output_class_fields + [req_key])
content = self.run_extract_content_from_output(content, right_key=req_key)
# # req_keys mocked
content = self.run_repair_llm_raw_output(content, req_keys=[None], repair_type=RepairType.JSON)
parsed_data = self.run_retry_parse_json_text(content)
return parsed_data
def run_repair_llm_raw_output(self, content: str, req_keys: list[str], repair_type: str = None) -> str:
"""inherited class can re-implement the function"""
return repair_llm_raw_output(content, req_keys=req_keys, repair_type=repair_type)
def run_extract_content_from_output(self, content: str, right_key: str) -> str:
"""inherited class can re-implement the function"""
return extract_content_from_output(content, right_key=right_key)
def run_retry_parse_json_text(self, content: str) -> Union[dict, list]:
"""inherited class can re-implement the function"""
logger.info(f"extracted json CONTENT from output:\n{content}")
parsed_data = retry_parse_json_text(output=content) # should use output=content
return parsed_data
def run(self, output: str, schema: dict, req_key: str = "[/CONTENT]") -> Union[dict, list]:
"""
this is used for prompt with a json-format output requirement and outer pair key, like
[REQ_KEY]
{
"Key": "value"
}
[/REQ_KEY]
Args
outer (str): llm raw output
schema: output json schema
req_key: outer pair right key, usually in `[/REQ_KEY]` format
"""
assert len(schema.get("properties")) > 0
assert "/" in req_key
# current, postprocess only deal the repair_llm_raw_output
new_output = self.run_repair_llm_output(output=output, schema=schema, req_key=req_key)
return new_output

View file

@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the entry of choosing which PostProcessPlugin to deal particular LLM model's output
from typing import Union
from metagpt.provider.postprecess.base_postprecess_plugin import BasePostPrecessPlugin
def llm_output_postprecess(
output: str, schema: dict, req_key: str = "[/CONTENT]", model_name: str = None
) -> Union[dict, str]:
"""
default use BasePostPrecessPlugin if there is not matched plugin.
"""
# TODO choose different model's plugin according to the model_name
postprecess_plugin = BasePostPrecessPlugin()
result = postprecess_plugin.run(output=output, schema=schema, req_key=req_key)
return result

View file

@ -26,8 +26,9 @@ class Architect(Role):
self,
name: str = "Bob",
profile: str = "Architect",
goal: str = "Design a concise, usable, complete python system",
constraints: str = "Try to specify good open source tools as much as possible",
goal: str = "design a concise, usable, complete software system",
constraints: str = "make sure the architecture is simple enough and use appropriate open source libraries."
"Use same language as user requirement"
) -> None:
"""Initializes the Architect with given attributes."""
super().__init__(name, profile, goal, constraints)

View file

@ -71,14 +71,16 @@ class Engineer(Role):
self,
name: str = "Alex",
profile: str = "Engineer",
goal: str = "Write elegant, readable, extensible, efficient code",
constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable",
goal: str = "write elegant, readable, extensible, efficient code",
constraints: str = "the code should conform to standards like google-style and be modular and maintainable. "
"Use same language as user requirement",
n_borg: int = 1,
use_code_review: bool = False,
) -> None:
"""Initializes the Engineer role with given attributes."""
super().__init__(name, profile, goal, constraints)
self.use_code_review = use_code_review
self._init_actions([WriteCode])
self._watch([WriteTasks, SummarizeCode, WriteCode, WriteCodeReview, FixBug])
self.code_todos = []
self.summarize_todos = []
@ -103,7 +105,9 @@ class Engineer(Role):
coding_context = await todo.run()
# Code review
if review:
coding_context = await WriteCodeReview(context=coding_context, llm=self._llm).run()
action = WriteCodeReview(context=coding_context, llm=self._llm)
self._init_action_system_message(action)
coding_context = await action.run()
await src_file_repo.save(
coding_context.filename,
dependencies={coding_context.design_doc.root_relative_path, coding_context.task_doc.root_relative_path},
@ -198,11 +202,11 @@ class Engineer(Role):
return None
msg = self._rc.news[0]
if msg.cause_by in write_code_filters:
logger.info(f"TODO WriteCode:{msg.json()}")
logger.debug(f"TODO WriteCode:{msg.json()}")
await self._new_code_actions(bug_fix=msg.cause_by == any_to_str(FixBug))
return self._rc.todo
if msg.cause_by in summarize_code_filters and msg.sent_from == any_to_str(self):
logger.info(f"TODO SummarizeCode:{msg.json()}")
logger.debug(f"TODO SummarizeCode:{msg.json()}")
await self._new_summarize_actions()
return self._rc.todo
return None
@ -222,6 +226,7 @@ class Engineer(Role):
task_doc = await task_file_repo.get(i.name)
elif str(i.parent) == SYSTEM_DESIGN_FILE_REPO:
design_doc = await design_file_repo.get(i.name)
# FIXME: design doc没有加载进来是None
context = CodingContext(filename=filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc)
return context

View file

@ -28,8 +28,8 @@ class ProductManager(Role):
self,
name: str = "Alice",
profile: str = "Product Manager",
goal: str = "Efficiently create a successful product",
constraints: str = "",
goal: str = "efficiently create a successful product",
constraints: str = "use same language as user requirement",
) -> None:
"""
Initializes the ProductManager role with given attributes.

View file

@ -25,8 +25,9 @@ class ProjectManager(Role):
self,
name: str = "Eve",
profile: str = "Project Manager",
goal: str = "Improve team efficiency and deliver with quality and quantity",
constraints: str = "",
goal: str = "break down tasks according to PRD/technical design, generate a task list, and analyze task "
"dependencies to start with the prerequisite modules",
constraints: str = "use same language as user requirement",
) -> None:
"""
Initializes the ProjectManager role with given attributes.

View file

@ -14,9 +14,7 @@
@Modified By: mashenquan, 2023-12-5. Enhance the workflow to navigate to WriteCode or QaEngineer based on the results
of SummarizeCode.
"""
from metagpt.actions import DebugError, RunCode, WriteCode, WriteCodeReview, WriteTest
# from metagpt.const import WORKSPACE_ROOT
from metagpt.actions import DebugError, RunCode, WriteTest
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.config import CONFIG
from metagpt.const import (
@ -28,6 +26,7 @@ from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Document, Message, RunCodeContext, TestingContext
from metagpt.utils.common import any_to_str_set, parse_recipient
from metagpt.utils.file_repository import FileRepository
class QaEngineer(Role):
@ -127,8 +126,8 @@ class QaEngineer(Role):
async def _debug_error(self, msg):
run_code_context = RunCodeContext.loads(msg.content)
code = await DebugError(context=run_code_context, llm=self._llm).run()
await CONFIG.git_repo.new_file_repository(CONFIG.src_workspace).save(
filename=run_code_context.code_filename, content=code
await FileRepository.save_file(
filename=run_code_context.test_filename, content=code, relative_path=TEST_CODES_FILE_REPO
)
run_code_context.output = None
self.publish_message(

View file

@ -26,14 +26,13 @@ from typing import Iterable, Set, Type
from pydantic import BaseModel, Field
from metagpt.actions import Action, ActionOutput
from metagpt.config import CONFIG
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM, HumanProvider
from metagpt.logs import logger
from metagpt.memory import Memory
# from metagpt.memory import LongTermMemory
from metagpt.schema import Message, MessageQueue
from metagpt.utils.common import any_to_str
from metagpt.utils.repair_llm_raw_output import extract_state_value_from_output
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """
@ -112,9 +111,10 @@ class RoleContext(BaseModel):
arbitrary_types_allowed = True
def check(self, role_id: str):
if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
self.long_term_memory.recover_memory(role_id, self)
self.memory = self.long_term_memory # use memory to act as long_term_memory for unify operation
# if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
# self.long_term_memory.recover_memory(role_id, self)
# self.memory = self.long_term_memory # use memory to act as long_term_memory for unify operation
pass
@property
def important_memory(self) -> list[Message]:
@ -134,6 +134,7 @@ class Role:
self._setting = RoleSetting(
name=name, profile=profile, goal=goal, constraints=constraints, desc=desc, is_human=is_human
)
self._llm.system_prompt = self._get_prefix()
self._states = []
self._actions = []
self._role_id = str(self._setting)
@ -144,6 +145,9 @@ class Role:
self._states = []
self._actions = []
def _init_action_system_message(self, action: Action):
action.set_prefix(self._get_prefix(), self.profile)
def _init_actions(self, actions):
self._reset()
for idx, action in enumerate(actions):
@ -152,12 +156,13 @@ class Role:
else:
if self._setting.is_human and not isinstance(action.llm, HumanProvider):
logger.warning(
f"is_human attribute does not take effect,"
f"as Role's {str(action)} was initialized using LLM, try passing in Action classes instead of initialized instances"
f"is_human attribute does not take effect, "
f"as Role's {str(action)} was initialized using LLM, "
f"try passing in Action classes instead of initialized instances"
)
i = action
i.set_env(self._rc.env)
i.set_prefix(self._get_prefix(), self.profile)
# i.set_env(self._rc.env)
self._init_action_system_message(i)
self._actions.append(i)
self._states.append(f"{idx}. {action}")
@ -213,22 +218,6 @@ class Role:
if env:
env.set_subscription(self, self._subscription)
# # Replaced by FileRepository.set_file
# def set_doc(self, content: str, filename: str):
# return self._rc.env.set_doc(content, filename)
#
# # Replaced by FileRepository.get_file
# def get_doc(self, filename: str):
# return self._rc.env.get_doc(filename)
#
# # Replaced by CONFIG.xx
# def set(self, k, v):
# return self._rc.env.set(k, v)
#
# # Replaced by CONFIG.xx
# def get(self, k):
# return self._rc.env.get(k)
@property
def profile(self):
"""Get the role description (position)"""
@ -265,6 +254,7 @@ class Role:
)
# print(prompt)
next_state = await self._llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)
logger.debug(f"{prompt=}")
if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self._states)):
logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
@ -278,7 +268,7 @@ class Role:
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")
response = await self._rc.todo.run(self._rc.important_memory)
if isinstance(response, ActionOutput):
if isinstance(response, ActionOutput) or isinstance(response, ActionNode):
msg = Message(
content=response.content,
instruct_content=response.instruct_content,
@ -406,7 +396,7 @@ class Role:
logger.debug(f"{self._setting}: no news. waiting.")
return
rsp = await self._react()
rsp = await self.react()
# Reset the next action to be taken.
self._rc.todo = None

View file

@ -8,6 +8,7 @@
the `cause_by` value in the `Message` to a string to support the new message distribution feature.
"""
from metagpt.actions import ActionOutput, SearchAndSummarize
from metagpt.actions.action_node import ActionNode
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
@ -58,7 +59,7 @@ class Searcher(Role):
logger.info(f"{self._setting}: ready to {self._rc.todo}")
response = await self._rc.todo.run(self._rc.memory.get(k=0))
if isinstance(response, ActionOutput):
if isinstance(response, ActionOutput) or isinstance(response, ActionNode):
msg = Message(
content=response.content,
instruct_content=response.instruct_content,

View file

@ -74,6 +74,12 @@ class Document(BaseModel):
return None
return str(CONFIG.git_repo.workdir / self.root_path / self.filename)
def __str__(self):
return self.content
def __repr__(self):
return self.content
class Documents(BaseModel):
"""A class representing a collection of documents.
@ -97,14 +103,14 @@ class Message(BaseModel):
send_to: Set = Field(default_factory={MESSAGE_ROUTE_TO_ALL})
def __init__(
self,
content,
instruct_content=None,
role="user",
cause_by="",
sent_from="",
send_to=MESSAGE_ROUTE_TO_ALL,
**kwargs,
self,
content,
instruct_content=None,
role="user",
cause_by="",
sent_from="",
send_to=MESSAGE_ROUTE_TO_ALL,
**kwargs,
):
"""
Parameters not listed below will be stored as meta info, including custom parameters.
@ -259,7 +265,7 @@ class MessageQueue:
class CodingContext(BaseModel):
filename: str
design_doc: Document
design_doc: Optional[Document]
task_doc: Optional[Document]
code_doc: Optional[Document]

View file

@ -63,7 +63,7 @@ class Team(BaseModel):
while n_round > 0:
# self._save()
n_round -= 1
logger.debug(f"{n_round=}")
logger.debug(f"max {n_round=} left.")
self._check_balance()
await self.env.run()
if CONFIG.git_repo:

View file

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : pure async http_client
from typing import Any, Mapping, Optional, Union
import aiohttp
from aiohttp.client import DEFAULT_TIMEOUT
async def apost(
url: str,
params: Optional[Mapping[str, str]] = None,
json: Any = None,
data: Any = None,
headers: Optional[dict] = None,
as_json: bool = False,
encoding: str = "utf-8",
timeout: int = DEFAULT_TIMEOUT.total,
) -> Union[str, dict]:
async with aiohttp.ClientSession() as session:
async with session.post(url=url, params=params, json=json, data=data, headers=headers, timeout=timeout) as resp:
if as_json:
data = await resp.json()
else:
data = await resp.read()
data = data.decode(encoding)
return data
async def apost_stream(
url: str,
params: Optional[Mapping[str, str]] = None,
json: Any = None,
data: Any = None,
headers: Optional[dict] = None,
encoding: str = "utf-8",
timeout: int = DEFAULT_TIMEOUT.total,
) -> Any:
"""
usage:
result = astream(url="xx")
async for line in result:
deal_with(line)
"""
async with aiohttp.ClientSession() as session:
async with session.post(url=url, params=params, json=json, data=data, headers=headers, timeout=timeout) as resp:
async for line in resp.content:
yield line.decode(encoding)

View file

@ -223,10 +223,15 @@ class CodeParser:
# 遍历所有的block
for block in blocks:
# 如果block不为空则继续处理
if block.strip() != "":
if block.strip() == "":
continue
if "\n" not in block:
block_title = block
block_content = ""
else:
# 将block的标题和内容分开并分别去掉前后的空白字符
block_title, block_content = block.split("\n", 1)
block_dict[block_title.strip()] = block_content.strip()
block_dict[block_title.strip()] = block_content.strip()
return block_dict

View file

@ -205,7 +205,7 @@ class FileRepository:
m = json.loads(doc.content)
filename = Path(doc.filename).with_suffix(with_suffix) if with_suffix is not None else Path(doc.filename)
await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies)
logger.info(f"File Saved: {str(filename)}")
logger.debug(f"File Saved: {str(filename)}")
@staticmethod
async def get_file(filename: Path | str, relative_path: Path | str = ".") -> Document | None:

View file

@ -8,13 +8,15 @@
"""
from __future__ import annotations
from gitignore_parser import parse_gitignore, rule_from_pattern, handle_negation
import shutil
from enum import Enum
from pathlib import Path
from typing import Dict, List
from git.repo import Repo
from git.repo.fun import is_git_dir
from gitignore_parser import parse_gitignore
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.utils.dependency_file import DependencyFile
@ -204,6 +206,7 @@ class GitRepository:
logger.warning(f"Move {str(self.workdir)} to {str(new_path)} error: {e}")
logger.info(f"Rename directory {str(self.workdir)} to {str(new_path)}")
self._repository = Repo(new_path)
self._gitignore_rules = parse_gitignore(full_path=str(new_path / ".gitignore"))
def get_files(self, relative_path: Path | str, root_relative_path: Path | str = None, filter_ignored=True) -> List:
"""
@ -230,13 +233,16 @@ class GitRepository:
files = []
try:
directory_path = Path(self.workdir) / relative_path
if not directory_path.exists():
return []
for file_path in directory_path.iterdir():
if file_path.is_file():
rpath = file_path.relative_to(root_relative_path)
files.append(str(rpath))
else:
subfolder_files = self.get_files(relative_path=file_path, root_relative_path=root_relative_path,
filter_ignored=False)
subfolder_files = self.get_files(
relative_path=file_path, root_relative_path=root_relative_path, filter_ignored=False
)
files.extend(subfolder_files)
except Exception as e:
logger.error(f"Error: {e}")

View file

@ -0,0 +1,310 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : repair llm raw output with particular conditions
import copy
from enum import Enum
from typing import Callable, Union
import regex as re
from tenacity import RetryCallState, retry, stop_after_attempt, wait_fixed
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.utils.custom_decoder import CustomDecoder
class RepairType(Enum):
CS = "case sensitivity"
RKPM = "required key pair missing" # condition like `[key] xx` which lacks `[/key]`
SCM = "special character missing" # Usually the req_key appear in pairs like `[key] xx [/key]`
JSON = "json format"
def repair_case_sensitivity(output: str, req_key: str) -> str:
"""
usually, req_key is the key name of expected json or markdown content, it won't appear in the value part.
fix target string `"Shared Knowledge": ""` but `"Shared knowledge": ""` actually
"""
if req_key in output:
return output
output_lower = output.lower()
req_key_lower = req_key.lower()
if req_key_lower in output_lower:
# find the sub-part index, and replace it with raw req_key
lidx = output_lower.find(req_key_lower)
source = output[lidx : lidx + len(req_key_lower)]
output = output.replace(source, req_key)
logger.info(f"repair_case_sensitivity: {req_key}")
return output
def repair_special_character_missing(output: str, req_key: str = "[/CONTENT]") -> str:
"""
fix
1. target string `[CONTENT] xx [CONTENT] xxx [CONTENT]` lacks `/` in the last `[CONTENT]`
2. target string `xx [CONTENT] xxx [CONTENT] xxxx` lacks `/` in the last `[CONTENT]`
"""
sc_arr = ["/"]
if req_key in output:
return output
for sc in sc_arr:
req_key_pure = req_key.replace(sc, "")
appear_cnt = output.count(req_key_pure)
if req_key_pure in output and appear_cnt > 1:
# req_key with special_character usually in the tail side
ridx = output.rfind(req_key_pure)
output = f"{output[:ridx]}{req_key}{output[ridx + len(req_key_pure):]}"
logger.info(f"repair_special_character_missing: {sc} in {req_key_pure} as position {ridx}")
return output
def repair_required_key_pair_missing(output: str, req_key: str = "[/CONTENT]") -> str:
"""
implement the req_key pair in the begin or end of the content
req_key format
1. `[req_key]`, and its pair `[/req_key]`
2. `[/req_key]`, and its pair `[req_key]`
"""
sc = "/" # special char
if req_key.startswith("[") and req_key.endswith("]"):
if sc in req_key:
left_key = req_key.replace(sc, "") # `[/req_key]` -> `[req_key]`
right_key = req_key
else:
left_key = req_key
right_key = f"{req_key[0]}{sc}{req_key[1:]}" # `[req_key]` -> `[/req_key]`
if left_key not in output:
output = left_key + "\n" + output
if right_key not in output:
def judge_potential_json(routput: str, left_key: str) -> Union[str, None]:
ridx = routput.rfind(left_key)
if ridx < 0:
return None
sub_output = routput[ridx:]
idx1 = sub_output.rfind("}")
idx2 = sub_output.rindex("]")
idx = idx1 if idx1 >= idx2 else idx2
sub_output = sub_output[: idx + 1]
return sub_output
if output.strip().endswith("}") or (output.strip().endswith("]") and not output.strip().endswith(left_key)):
# # avoid [req_key]xx[req_key] case to append [/req_key]
output = output + "\n" + right_key
elif judge_potential_json(output, left_key) and (not output.strip().endswith(left_key)):
sub_content = judge_potential_json(output, left_key)
output = sub_content + "\n" + right_key
return output
def repair_json_format(output: str) -> str:
"""
fix extra `[` or `}` in the end
"""
output = output.strip()
if output.startswith("[{"):
output = output[1:]
logger.info(f"repair_json_format: {'[{'}")
elif output.endswith("}]"):
output = output[:-1]
logger.info(f"repair_json_format: {'}]'}")
elif output.startswith("{") and output.endswith("]"):
output = output[:-1] + "}"
return output
def _repair_llm_raw_output(output: str, req_key: str, repair_type: RepairType = None) -> str:
repair_types = [repair_type] if repair_type else [item for item in RepairType if item not in [RepairType.JSON]]
for repair_type in repair_types:
if repair_type == RepairType.CS:
output = repair_case_sensitivity(output, req_key)
elif repair_type == RepairType.RKPM:
output = repair_required_key_pair_missing(output, req_key)
elif repair_type == RepairType.SCM:
output = repair_special_character_missing(output, req_key)
elif repair_type == RepairType.JSON:
output = repair_json_format(output)
return output
def repair_llm_raw_output(output: str, req_keys: list[str], repair_type: RepairType = None) -> str:
"""
in open-source llm model, it usually can't follow the instruction well, the output may be incomplete,
so here we try to repair it and use all repair methods by default.
typical case
1. case sensitivity
target: "Original Requirements"
output: "Original requirements"
2. special character missing
target: [/CONTENT]
output: [CONTENT]
3. json format
target: { xxx }
output: { xxx }]
"""
if not CONFIG.repair_llm_output:
return output
# do the repairation usually for non-openai models
for req_key in req_keys:
output = _repair_llm_raw_output(output=output, req_key=req_key, repair_type=repair_type)
return output
def repair_invalid_json(output: str, error: str) -> str:
"""
repair the situation like there are extra chars like
error examples
example 1. json.decoder.JSONDecodeError: Expecting ',' delimiter: line 154 column 1 (char 2765)
example 2. xxx.JSONDecodeError: Expecting property name enclosed in double quotes: line 14 column 1 (char 266)
"""
pattern = r"line ([0-9]+)"
matches = re.findall(pattern, error, re.DOTALL)
if len(matches) > 0:
line_no = int(matches[0]) - 1
# due to CustomDecoder can handle `"": ''` or `'': ""`, so convert `"""` -> `"`, `'''` -> `'`
output = output.replace('"""', '"').replace("'''", '"')
arr = output.split("\n")
line = arr[line_no].strip()
# different general problems
if line.endswith("],"):
# problem, redundant char `]`
new_line = line.replace("]", "")
elif line.endswith("},") and not output.endswith("},"):
# problem, redundant char `}`
new_line = line.replace("}", "")
elif line.endswith("},") and output.endswith("},"):
new_line = line[:-1]
elif '",' not in line and "," not in line:
new_line = f'{line}",'
elif "," not in line:
# problem, miss char `,` at the end.
new_line = f"{line},"
elif "," in line and len(line) == 1:
new_line = f'"{line}'
elif '",' in line:
new_line = line[:-2] + "',"
arr[line_no] = new_line
output = "\n".join(arr)
logger.info(f"repair_invalid_json, raw error: {error}")
return output
def run_after_exp_and_passon_next_retry(logger: "loguru.Logger") -> Callable[["RetryCallState"], None]:
def run_and_passon(retry_state: RetryCallState) -> None:
"""
RetryCallState example
{
"start_time":143.098322024,
"retry_object":"<Retrying object at 0x7fabcaca25e0 (stop=<tenacity.stop.stop_after_attempt ... >)>",
"fn":"<function retry_parse_json_text_v2 at 0x7fabcac80ee0>",
"args":"(\"tag:[/CONTENT]\",)", # function input args
"kwargs":{}, # function input kwargs
"attempt_number":1, # retry number
"outcome":"<Future at xxx>", # type(outcome.result()) = "str", type(outcome.exception()) = "class"
"outcome_timestamp":143.098416904,
"idle_for":0,
"next_action":"None"
}
"""
if retry_state.outcome.failed:
if retry_state.args:
# # can't be used as args=retry_state.args
func_param_output = retry_state.args[0]
elif retry_state.kwargs:
func_param_output = retry_state.kwargs.get("output", "")
exp_str = str(retry_state.outcome.exception())
logger.warning(
f"parse json from content inside [CONTENT][/CONTENT] failed at retry "
f"{retry_state.attempt_number}, try to fix it, exp: {exp_str}"
)
repaired_output = repair_invalid_json(func_param_output, exp_str)
retry_state.kwargs["output"] = repaired_output
return run_and_passon
@retry(
stop=stop_after_attempt(3 if CONFIG.repair_llm_output else 0),
wait=wait_fixed(1),
after=run_after_exp_and_passon_next_retry(logger),
)
def retry_parse_json_text(output: str) -> Union[list, dict]:
"""
repair the json-text situation like there are extra chars like [']', '}']
Warning
if CONFIG.repair_llm_output is False, retry _aask_v1 {x=3} times, and the retry_parse_json_text's retry not work
if CONFIG.repair_llm_output is True, the _aask_v1 and the retry_parse_json_text will loop for {x=3*3} times.
it's a two-layer retry cycle
"""
logger.debug(f"output to json decode:\n{output}")
# if CONFIG.repair_llm_output is True, it will try to fix output until the retry break
parsed_data = CustomDecoder(strict=False).decode(output)
return parsed_data
def extract_content_from_output(content: str, right_key: str = "[/CONTENT]"):
"""extract xxx from [CONTENT](xxx)[/CONTENT] using regex pattern"""
def re_extract_content(cont: str, pattern: str) -> str:
matches = re.findall(pattern, cont, re.DOTALL)
for match in matches:
if match:
cont = match
break
return cont.strip()
# TODO construct the extract pattern with the `right_key`
raw_content = copy.deepcopy(content)
pattern = r"\[CONTENT\]([\s\S]*)\[/CONTENT\]"
new_content = re_extract_content(raw_content, pattern)
if not new_content.startswith("{"):
# TODO find a more general pattern
# # for `[CONTENT]xxx[CONTENT]xxxx[/CONTENT] situation
logger.warning(f"extract_content try another pattern: {pattern}")
if right_key not in new_content:
raw_content = copy.deepcopy(new_content + "\n" + right_key)
# # pattern = r"\[CONTENT\](\s*\{.*?\}\s*)\[/CONTENT\]"
new_content = re_extract_content(raw_content, pattern)
else:
if right_key in new_content:
idx = new_content.find(right_key)
new_content = new_content[:idx]
new_content = new_content.strip()
return new_content
def extract_state_value_from_output(content: str) -> str:
"""
For openai models, they will always return state number. But for open llm models, the instruction result maybe a
long text contain target number, so here add a extraction to improve success rate.
Args:
content (str): llm's output from `Role._think`
"""
content = content.strip() # deal the output cases like " 0", "0\n" and so on.
pattern = r"([0-9])" # TODO find the number using a more proper method not just extract from content using pattern
matches = re.findall(pattern, content, re.DOTALL)
matches = list(set(matches))
state = matches[0] if len(matches) > 0 else "-1"
return state

22
metagpt/utils/utils.py Normal file
View file

@ -0,0 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
import typing
from tenacity import _utils
def general_after_log(logger: "loguru.Logger", sec_format: str = "%0.3f") -> typing.Callable[["RetryCallState"], None]:
def log_it(retry_state: "RetryCallState") -> None:
if retry_state.fn is None:
fn_name = "<unknown>"
else:
fn_name = _utils.get_callback_name(retry_state.fn)
logger.error(
f"Finished call to '{fn_name}' after {sec_format % retry_state.seconds_since_start}(s), "
f"this was the {_utils.to_ordinal(retry_state.attempt_number)} time calling it. "
f"exp: {retry_state.outcome.exception()}"
)
return log_it