Merge pull request #298 from femto/feature/json_write_prd

Feature/json write prd and design_api and project_management
This commit is contained in:
stellaHSR 2023-09-22 15:46:07 +08:00 committed by GitHub
commit 3ca4d4fd87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 974 additions and 211 deletions

View file

@ -82,4 +82,6 @@ MODEL_FOR_RESEARCHER_REPORT: gpt-3.5-turbo-16k
# MERMAID_ENGINE: nodejs
### browser path for pyppeteer engine, support Chrome, Chromium,MS Edge
#PYPPETEER_EXECUTABLE_PATH: "/usr/bin/google-chrome-stable"
#PYPPETEER_EXECUTABLE_PATH: "/usr/bin/google-chrome-stable"
PROMPT_FORMAT: json #json or markdown

View file

@ -5,6 +5,7 @@
@Author : alexanderwu
@File : action.py
"""
import re
from abc import ABC
from typing import Optional
@ -12,11 +13,13 @@ from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions.action_output import ActionOutput
from metagpt.llm import LLM
from metagpt.utils.common import OutputParser
from metagpt.logs import logger
from metagpt.utils.common import OutputParser
from metagpt.utils.custom_decoder import CustomDecoder
class Action(ABC):
def __init__(self, name: str = '', context=None, llm: LLM = None):
def __init__(self, name: str = "", context=None, llm: LLM = None):
self.name: str = name
if llm is None:
llm = LLM()
@ -46,10 +49,15 @@ class Action(ABC):
system_msgs.append(self.prefix)
return await self.llm.aask(prompt, system_msgs)
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def _aask_v1(self, prompt: str, output_class_name: str,
output_data_mapping: dict,
system_msgs: Optional[list[str]] = None) -> ActionOutput:
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def _aask_v1(
self,
prompt: str,
output_class_name: str,
output_data_mapping: dict,
system_msgs: Optional[list[str]] = None,
format="markdown", # compatible to original format
) -> ActionOutput:
"""Append default prefix"""
if not system_msgs:
system_msgs = []
@ -57,7 +65,21 @@ class Action(ABC):
content = await self.llm.aask(prompt, system_msgs)
logger.debug(content)
output_class = ActionOutput.create_model_class(output_class_name, output_data_mapping)
parsed_data = OutputParser.parse_data_with_mapping(content, output_data_mapping)
if format == "json":
pattern = r"\[CONTENT\](\s*\{.*?\}\s*)\[/CONTENT\]"
matches = re.findall(pattern, content, re.DOTALL)
for match in matches:
if match:
content = match
break
parsed_data = CustomDecoder(strict=False).decode(content)
else: # using markdown parser
parsed_data = OutputParser.parse_data_with_mapping(content, output_data_mapping)
logger.debug(parsed_data)
instruct_content = output_class(**parsed_data)
return ActionOutput(content, instruct_content)
@ -65,4 +87,3 @@ class Action(ABC):
async def run(self, *args, **kwargs):
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")

View file

@ -10,12 +10,69 @@ from pathlib import Path
from typing import List
from metagpt.actions import Action, ActionOutput
from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.utils.common import CodeParser
from metagpt.utils.get_template import get_template
from metagpt.utils.json_to_markdown import json_to_markdown
from metagpt.utils.mermaid import mermaid_to_file
PROMPT_TEMPLATE = """
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
{context}
## Format example
{format_example}
-----
Role: You are an architect; the goal is to design a SOTA PEP8-compliant python system; make the best use of good open source tools
Requirement: Fill in the following missing information based on the context, each section name is a key in json
Max Output: 8192 chars or 2048 tokens. Try to use them up.
## Implementation approach: Provide as Plain text. Analyze the difficult points of the requirements, select the appropriate open-source framework.
## Python package name: Provide as Python str with python triple quoto, concise and clear, characters only use a combination of all lowercase and underscores
## File list: Provided as Python list[str], the list of ONLY REQUIRED files needed to write the program(LESS IS MORE!). Only need relative paths, comply with PEP8 standards. ALWAYS write a main.py or app.py here
## Data structures and interface definitions: Use mermaid classDiagram code syntax, including classes (INCLUDING __init__ method) and functions (with type annotations), CLEARLY MARK the RELATIONSHIPS between classes, and comply with PEP8 standards. The data structures SHOULD BE VERY DETAILED and the API should be comprehensive with a complete design.
## Program call flow: Use sequenceDiagram code syntax, COMPLETE and VERY DETAILED, using CLASSES AND API DEFINED ABOVE accurately, covering the CRUD AND INIT of each object, SYNTAX MUST BE CORRECT.
## Anything UNCLEAR: Provide as Plain text. Make clear here.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": """
[CONTENT]
{
"Implementation approach": "We will ...",
"Python package name": "snake_game",
"File list": ["main.py"],
"Data structures and interface definitions": '
classDiagram
class Game{
+int score
}
...
Game "1" -- "1" Food: has
',
"Program call flow": '
sequenceDiagram
participant M as Main
...
G->>M: end game
',
"Anything UNCLEAR": "The requirement is clear to me."
}
[/CONTENT]
""",
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
{context}
@ -39,8 +96,8 @@ Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD W
## Anything UNCLEAR: Provide as Plain text. Make clear here.
"""
FORMAT_EXAMPLE = """
""",
"FORMAT_EXAMPLE": """
---
## Implementation approach
We will ...
@ -78,7 +135,10 @@ sequenceDiagram
## Anything UNCLEAR
The requirement is clear to me.
---
"""
""",
},
}
OUTPUT_MAPPING = {
"Implementation approach": (str, ...),
"Python package name": (str, ...),
@ -92,9 +152,11 @@ OUTPUT_MAPPING = {
class WriteDesign(Action):
def __init__(self, name, context=None, llm=None):
super().__init__(name, context, llm)
self.desc = "Based on the PRD, think about the system design, and design the corresponding APIs, " \
"data structures, library tables, processes, and paths. Please provide your design, feedback " \
"clearly and in detail."
self.desc = (
"Based on the PRD, think about the system design, and design the corresponding APIs, "
"data structures, library tables, processes, and paths. Please provide your design, feedback "
"clearly and in detail."
)
def recreate_workspace(self, workspace: Path):
try:
@ -103,42 +165,47 @@ class WriteDesign(Action):
pass # Folder does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
async def _save_prd(self, docs_path, resources_path, prd):
prd_file = docs_path / 'prd.md'
quadrant_chart = CodeParser.parse_code(block="Competitive Quadrant Chart", text=prd)
await mermaid_to_file(quadrant_chart, resources_path / 'competitive_analysis')
logger.info(f"Saving PRD to {prd_file}")
prd_file.write_text(prd)
async def _save_prd(self, docs_path, resources_path, context):
prd_file = docs_path / "prd.md"
if context[-1].instruct_content and context[-1].instruct_content.dict()["Competitive Quadrant Chart"]:
quadrant_chart = context[-1].instruct_content.dict()["Competitive Quadrant Chart"]
await mermaid_to_file(quadrant_chart, resources_path / "competitive_analysis")
async def _save_system_design(self, docs_path, resources_path, content):
data_api_design = CodeParser.parse_code(block="Data structures and interface definitions", text=content)
seq_flow = CodeParser.parse_code(block="Program call flow", text=content)
await mermaid_to_file(data_api_design, resources_path / 'data_api_design')
await mermaid_to_file(seq_flow, resources_path / 'seq_flow')
system_design_file = docs_path / 'system_design.md'
if context[-1].instruct_content:
logger.info(f"Saving PRD to {prd_file}")
prd_file.write_text(json_to_markdown(context[-1].instruct_content.dict()))
async def _save_system_design(self, docs_path, resources_path, system_design):
data_api_design = system_design.instruct_content.dict()[
"Data structures and interface definitions"
] # CodeParser.parse_code(block="Data structures and interface definitions", text=content)
seq_flow = system_design.instruct_content.dict()[
"Program call flow"
] # CodeParser.parse_code(block="Program call flow", text=content)
await mermaid_to_file(data_api_design, resources_path / "data_api_design")
await mermaid_to_file(seq_flow, resources_path / "seq_flow")
system_design_file = docs_path / "system_design.md"
logger.info(f"Saving System Designs to {system_design_file}")
system_design_file.write_text(content)
system_design_file.write_text((json_to_markdown(system_design.instruct_content.dict())))
async def _save(self, context, system_design):
async def _save(self, context, system_design):
if isinstance(system_design, ActionOutput):
content = system_design.content
ws_name = CodeParser.parse_str(block="Python package name", text=content)
ws_name = system_design.instruct_content.dict()["Python package name"]
else:
content = system_design
ws_name = CodeParser.parse_str(block="Python package name", text=system_design)
workspace = WORKSPACE_ROOT / ws_name
self.recreate_workspace(workspace)
docs_path = workspace / 'docs'
resources_path = workspace / 'resources'
docs_path = workspace / "docs"
resources_path = workspace / "resources"
docs_path.mkdir(parents=True, exist_ok=True)
resources_path.mkdir(parents=True, exist_ok=True)
await self._save_prd(docs_path, resources_path, context[-1].content)
await self._save_system_design(docs_path, resources_path, content)
await self._save_prd(docs_path, resources_path, context)
await self._save_system_design(docs_path, resources_path, system_design)
async def run(self, context):
prompt = PROMPT_TEMPLATE.format(context=context, format_example=FORMAT_EXAMPLE)
async def run(self, context, format=CONFIG.prompt_format):
prompt_template, format_example = get_template(templates, format)
prompt = prompt_template.format(context=context, format_example=format_example)
# system_design = await self._aask(prompt)
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING)
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING, format=format)
await self._save(context, system_design)
return system_design

View file

@ -5,13 +5,74 @@
@Author : alexanderwu
@File : project_management.py
"""
from typing import List, Tuple
from typing import List
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.const import WORKSPACE_ROOT
from metagpt.utils.common import CodeParser
from metagpt.utils.get_template import get_template
from metagpt.utils.json_to_markdown import json_to_markdown
PROMPT_TEMPLATE = '''
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
{context}
## Format example
{format_example}
-----
Role: You are a project manager; the goal is to break down tasks according to PRD/technical design, give a task list, and analyze task dependencies to start with the prerequisite modules
Requirements: Based on the context, fill in the following missing information, each section name is a key in json. Here the granularity of the task is a file, if there are any missing files, you can supplement them
Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD WRITE BEFORE the code and triple quote.
## Required Python third-party packages: Provided in requirements.txt format
## Required Other language third-party packages: Provided in requirements.txt format
## Full API spec: Use OpenAPI 3.0. Describe all APIs that may be used by both frontend and backend.
## Logic Analysis: Provided as a Python list[list[str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Task list: Provided as Python list[str]. Each str is a filename, the more at the beginning, the more it is a prerequisite dependency, should be done first
## Shared Knowledge: Anything that should be public like utils' functions, config's variables details that should make clear first.
## Anything UNCLEAR: Provide as Plain text. Make clear here. For example, don't forget a main entry. don't forget to init 3rd party libs.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": '''
{
"Required Python third-party packages": [
"flask==1.1.2",
"bcrypt==3.2.0"
],
"Required Other language third-party packages": [
"No third-party ..."
],
"Full API spec": """
openapi: 3.0.0
...
description: A JSON object ...
""",
"Logic Analysis": [
["game.py","Contains..."]
],
"Task list": [
"game.py"
],
"Shared Knowledge": """
'game.py' contains ...
""",
"Anything UNCLEAR": "We need ... how to start."
}
''',
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
{context}
@ -28,7 +89,7 @@ Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD W
## Full API spec: Use OpenAPI 3.0. Describe all APIs that may be used by both frontend and backend.
## Logic Analysis: Provided as a Python list[str, str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Logic Analysis: Provided as a Python list[list[str]. the first is filename, the second is class/method/function should be implemented in this file. Analyze the dependencies between the files, which work should be done first
## Task list: Provided as Python list[str]. Each str is a filename, the more at the beginning, the more it is a prerequisite dependency, should be done first
@ -36,9 +97,8 @@ Attention: Use '##' to split sections, not '#', and '## <SECTION_NAME>' SHOULD W
## Anything UNCLEAR: Provide as Plain text. Make clear here. For example, don't forget a main entry. don't forget to init 3rd party libs.
'''
FORMAT_EXAMPLE = '''
""",
"FORMAT_EXAMPLE": '''
---
## Required Python third-party packages
```python
@ -67,7 +127,7 @@ description: A JSON object ...
## Logic Analysis
```python
[
("game.py", "Contains ..."),
["game.py", "Contains ..."],
]
```
@ -88,13 +148,14 @@ description: A JSON object ...
## Anything UNCLEAR
We need ... how to start.
---
'''
''',
},
}
OUTPUT_MAPPING = {
"Required Python third-party packages": (str, ...),
"Required Other language third-party packages": (str, ...),
"Required Python third-party packages": (List[str], ...),
"Required Other language third-party packages": (List[str], ...),
"Full API spec": (str, ...),
"Logic Analysis": (List[Tuple[str, str]], ...),
"Logic Analysis": (List[List[str]], ...),
"Task list": (List[str], ...),
"Shared Knowledge": (str, ...),
"Anything UNCLEAR": (str, ...),
@ -102,22 +163,25 @@ OUTPUT_MAPPING = {
class WriteTasks(Action):
def __init__(self, name="CreateTasks", context=None, llm=None):
super().__init__(name, context, llm)
def _save(self, context, rsp):
ws_name = CodeParser.parse_str(block="Python package name", text=context[-1].content)
file_path = WORKSPACE_ROOT / ws_name / 'docs/api_spec_and_tasks.md'
file_path.write_text(rsp.content)
if context[-1].instruct_content:
ws_name = context[-1].instruct_content.dict()["Python package name"]
else:
ws_name = CodeParser.parse_str(block="Python package name", text=context[-1].content)
file_path = WORKSPACE_ROOT / ws_name / "docs/api_spec_and_tasks.md"
file_path.write_text(json_to_markdown(rsp.instruct_content.dict()))
# Write requirements.txt
requirements_path = WORKSPACE_ROOT / ws_name / 'requirements.txt'
requirements_path.write_text(rsp.instruct_content.dict().get("Required Python third-party packages").strip('"\n'))
requirements_path = WORKSPACE_ROOT / ws_name / "requirements.txt"
requirements_path.write_text("\n".join(rsp.instruct_content.dict().get("Required Python third-party packages")))
async def run(self, context):
prompt = PROMPT_TEMPLATE.format(context=context, format_example=FORMAT_EXAMPLE)
rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING)
async def run(self, context, format=CONFIG.prompt_format):
prompt_template, format_example = get_template(templates, format)
prompt = prompt_template.format(context=context, format_example=format_example)
rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING, format=format)
self._save(context, rsp)
return rsp
@ -126,4 +190,3 @@ class AssignTasks(Action):
async def run(self, *args, **kwargs):
# Here you should implement the actual action
pass

View file

@ -5,13 +5,102 @@
@Author : alexanderwu
@File : write_prd.py
"""
from typing import List, Tuple
from typing import List
from metagpt.actions import Action, ActionOutput
from metagpt.actions.search_and_summarize import SearchAndSummarize
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.utils.get_template import get_template
PROMPT_TEMPLATE = """
templates = {
"json": {
"PROMPT_TEMPLATE": """
# Context
## Original Requirements
{requirements}
## Search Information
{search_information}
## mermaid quadrantChart code syntax example. DONT USE QUOTO IN CODE DUE TO INVALID SYNTAX. Replace the <Campain X> with REAL COMPETITOR NAME
```mermaid
quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
"Campaign: A": [0.3, 0.6]
"Campaign B": [0.45, 0.23]
"Campaign C": [0.57, 0.69]
"Campaign D": [0.78, 0.34]
"Campaign E": [0.40, 0.34]
"Campaign F": [0.35, 0.78]
"Our Target Product": [0.5, 0.6]
```
## Format example
{format_example}
-----
Role: You are a professional product manager; the goal is to design a concise, usable, efficient product
Requirements: According to the context, fill in the following missing information, each section name is a key in json ,If the requirements are unclear, ensure minimum viability and avoid excessive design
## Original Requirements: Provide as Plain text, place the polished complete original requirements here
## Product Goals: Provided as Python list[str], up to 3 clear, orthogonal product goals. If the requirement itself is simple, the goal should also be simple
## User Stories: Provided as Python list[str], up to 5 scenario-based user stories, If the requirement itself is simple, the user stories should also be less
## Competitive Analysis: Provided as Python list[str], up to 7 competitive product analyses, consider as similar competitors as possible
## Competitive Quadrant Chart: Use mermaid quadrantChart code syntax. up to 14 competitive products. Translation: Distribute these competitor scores evenly between 0 and 1, trying to conform to a normal distribution centered around 0.5 as much as possible.
## Requirement Analysis: Provide as Plain text. Be simple. LESS IS MORE. Make your requirements less dumb. Delete the parts unnessasery.
## Requirement Pool: Provided as Python list[list[str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards; no more than 5 requirements and consider to make its difficulty lower
## UI Design draft: Provide as Plain text. Be simple. Describe the elements and functions, also provide a simple style description and layout description.
## Anything UNCLEAR: Provide as Plain text. Make clear here.
output a properly formatted JSON, wrapped inside [CONTENT][/CONTENT] like format example,
and only output the json inside this tag, nothing else
""",
"FORMAT_EXAMPLE": """
[CONTENT]
{
"Original Requirements": "",
"Search Information": "",
"Requirements": "",
"Product Goals": [],
"User Stories": [],
"Competitive Analysis": [],
"Competitive Quadrant Chart": "quadrantChart
title Reach and engagement of campaigns
x-axis Low Reach --> High Reach
y-axis Low Engagement --> High Engagement
quadrant-1 We should expand
quadrant-2 Need to promote
quadrant-3 Re-evaluate
quadrant-4 May be improved
Campaign A: [0.3, 0.6]
Campaign B: [0.45, 0.23]
Campaign C: [0.57, 0.69]
Campaign D: [0.78, 0.34]
Campaign E: [0.40, 0.34]
Campaign F: [0.35, 0.78]",
"Requirement Analysis": "",
"Requirement Pool": [["P0","P0 requirement"],["P1","P1 requirement"]],
"UI Design draft": "",
"Anything UNCLEAR": "",
}
[/CONTENT]
""",
},
"markdown": {
"PROMPT_TEMPLATE": """
# Context
## Original Requirements
{requirements}
@ -57,12 +146,12 @@ ATTENTION: Use '##' to SPLIT SECTIONS, not '#'. AND '## <SECTION_NAME>' SHOULD W
## Requirement Analysis: Provide as Plain text. Be simple. LESS IS MORE. Make your requirements less dumb. Delete the parts unnessasery.
## Requirement Pool: Provided as Python list[str, str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards; no more than 5 requirements and consider to make its difficulty lower
## Requirement Pool: Provided as Python list[list[str], the parameters are requirement description, priority(P0/P1/P2), respectively, comply with PEP standards; no more than 5 requirements and consider to make its difficulty lower
## UI Design draft: Provide as Plain text. Be simple. Describe the elements and functions, also provide a simple style description and layout description.
## Anything UNCLEAR: Provide as Plain text. Make clear here.
"""
FORMAT_EXAMPLE = """
""",
"FORMAT_EXAMPLE": """
---
## Original Requirements
The boss ...
@ -102,7 +191,7 @@ The product should be a ...
## Requirement Pool
```python
[
("End game ...", "P0")
["End game ...", "P0"]
]
```
@ -112,7 +201,10 @@ Give a basic function description, and a draft
## Anything UNCLEAR
There are no unclear points.
---
"""
""",
},
}
OUTPUT_MAPPING = {
"Original Requirements": (str, ...),
"Product Goals": (List[str], ...),
@ -120,8 +212,8 @@ OUTPUT_MAPPING = {
"Competitive Analysis": (List[str], ...),
"Competitive Quadrant Chart": (str, ...),
"Requirement Analysis": (str, ...),
"Requirement Pool": (List[Tuple[str, str]], ...),
"UI Design draft":(str, ...),
"Requirement Pool": (List[List[str]], ...),
"UI Design draft": (str, ...),
"Anything UNCLEAR": (str, ...),
}
@ -130,7 +222,7 @@ class WritePRD(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
async def run(self, requirements, *args, **kwargs) -> ActionOutput:
async def run(self, requirements, format=CONFIG.prompt_format, *args, **kwargs) -> ActionOutput:
sas = SearchAndSummarize()
# rsp = await sas.run(context=requirements, system_text=SEARCH_AND_SUMMARIZE_SYSTEM_EN_US)
rsp = ""
@ -139,9 +231,11 @@ class WritePRD(Action):
logger.info(sas.result)
logger.info(rsp)
prompt = PROMPT_TEMPLATE.format(requirements=requirements, search_information=info,
format_example=FORMAT_EXAMPLE)
prompt_template, format_example = get_template(templates, format)
prompt = prompt_template.format(
requirements=requirements, search_information=info, format_example=format_example
)
logger.debug(prompt)
prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING)
# prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING)
prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING, format=format)
return prd

View file

@ -6,6 +6,7 @@
@File : environment.py
"""
from metagpt.actions.action import Action
from metagpt.logs import logger
from metagpt.utils.common import CodeParser
PROMPT_TEMPLATE = """
@ -35,7 +36,15 @@ class WriteTest(Action):
async def write_code(self, prompt):
code_rsp = await self._aask(prompt)
code = CodeParser.parse_code(block="", text=code_rsp)
try:
code = CodeParser.parse_code(block="", text=code_rsp)
except Exception:
# Handle the exception if needed
logger.error(f"Can't parse the code: {code_rsp}")
# Return code_rsp in case of an exception, assuming llm just returns code as it is and doesn't wrap it inside ```
code = code_rsp
return code
async def run(self, code_to_test, test_file_name, source_file_path, workspace):

View file

@ -59,7 +59,7 @@ class Config(metaclass=Singleton):
self.openai_api_rpm = self._get("RPM", 3)
self.openai_api_model = self._get("OPENAI_API_MODEL", "gpt-4")
self.max_tokens_rsp = self._get("MAX_TOKENS", 2048)
self.deployment_name = self._get('DEPLOYMENT_NAME')
self.deployment_name = self._get("DEPLOYMENT_NAME")
self.deployment_id = self._get("DEPLOYMENT_ID")
self.claude_api_key = self._get("Anthropic_API_KEY")
@ -83,8 +83,10 @@ class Config(metaclass=Singleton):
self.calc_usage = self._get("CALC_USAGE", True)
self.model_for_researcher_summary = self._get("MODEL_FOR_RESEARCHER_SUMMARY")
self.model_for_researcher_report = self._get("MODEL_FOR_RESEARCHER_REPORT")
self.mermaid_engine = self._get("MERMAID_ENGINE", 'nodejs')
self.pyppeteer_executable_path = self._get("PYPPETEER_EXECUTABLE_PATH", '')
self.mermaid_engine = self._get("MERMAID_ENGINE", "nodejs")
self.pyppeteer_executable_path = self._get("PYPPETEER_EXECUTABLE_PATH", "")
self.prompt_format = self._get("PROMPT_FORMAT", "markdown")
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
"""Load from config/key.yaml, config/config.yaml, and env in decreasing order of priority"""
@ -113,4 +115,4 @@ class Config(metaclass=Singleton):
return value
CONFIG = Config()
CONFIG = Config()

View file

@ -5,13 +5,15 @@
@Author : unkn-wn (Leon Yee)
@File : lancedb_store.py
"""
import os
import shutil
import lancedb
import shutil, os
class LanceStore:
def __init__(self, name):
db = lancedb.connect('./data/lancedb')
db = lancedb.connect("./data/lancedb")
self.db = db
self.name = name
self.table = None
@ -23,16 +25,18 @@ class LanceStore:
# .where - SQL syntax filtering for metadata (e.g. where("price > 100"))
# .metric - specifies the distance metric to use
# .nprobes - values will yield better recall (more likely to find vectors if they exist) at the expense of latency.
if self.table == None: raise Exception("Table not created yet, please add data first.")
if self.table is None:
raise Exception("Table not created yet, please add data first.")
results = self.table \
.search(query) \
.limit(n_results) \
.select(kwargs.get('select')) \
.where(kwargs.get('where')) \
.metric(metric) \
.nprobes(nprobes) \
results = (
self.table.search(query)
.limit(n_results)
.select(kwargs.get("select"))
.where(kwargs.get("where"))
.metric(metric)
.nprobes(nprobes)
.to_df()
)
return results
def persist(self):
@ -45,14 +49,11 @@ class LanceStore:
documents = []
for i in range(len(data)):
row = {
'vector': data[i],
'id': ids[i]
}
row = {"vector": data[i], "id": ids[i]}
row.update(metadatas[i])
documents.append(row)
if self.table != None:
if self.table is not None:
self.table.add(documents)
else:
self.table = self.db.create_table(self.name, documents)
@ -61,13 +62,10 @@ class LanceStore:
# This function is for adding individual documents
# It assumes you're passing in a single vector embedding, metadata, and id
row = {
'vector': data,
'id': _id
}
row = {"vector": data, "id": _id}
row.update(metadata)
if self.table != None:
if self.table is not None:
self.table.add([row])
else:
self.table = self.db.create_table(self.name, [row])
@ -75,7 +73,8 @@ class LanceStore:
def delete(self, _id):
# This function deletes a row by id.
# LanceDB delete syntax uses SQL syntax, so you can use "in" or "="
if self.table == None: raise Exception("Table not created yet, please add data first")
if self.table is None:
raise Exception("Table not created yet, please add data first")
if isinstance(_id, str):
return self.table.delete(f"id = '{_id}'")
@ -85,6 +84,6 @@ class LanceStore:
def drop(self, name):
# This function drops a table, if it exists.
path = os.path.join(self.db.uri, name + '.lance')
path = os.path.join(self.db.uri, name + ".lance")
if os.path.exists(path):
shutil.rmtree(path)
shutil.rmtree(path)

View file

@ -6,33 +6,34 @@
@File : architect.py
"""
from metagpt.actions import WriteDesign, WritePRD
from metagpt.actions import WritePRD
from metagpt.actions.design_api import WriteDesign
from metagpt.roles import Role
class Architect(Role):
"""
Represents an Architect role in a software development process.
Attributes:
name (str): Name of the architect.
profile (str): Role profile, default is 'Architect'.
goal (str): Primary goal or responsibility of the architect.
constraints (str): Constraints or guidelines for the architect.
"""
def __init__(self,
name: str = "Bob",
profile: str = "Architect",
goal: str = "Design a concise, usable, complete python system",
constraints: str = "Try to specify good open source tools as much as possible") -> None:
def __init__(
self,
name: str = "Bob",
profile: str = "Architect",
goal: str = "Design a concise, usable, complete python system",
constraints: str = "Try to specify good open source tools as much as possible",
) -> None:
"""Initializes the Architect with given attributes."""
super().__init__(name, profile, goal, constraints)
# Initialize actions specific to the Architect role
self._init_actions([WriteDesign])
# Set events or actions the Architect should watch or be aware of
self._watch({WritePRD})

View file

@ -10,13 +10,13 @@ import shutil
from collections import OrderedDict
from pathlib import Path
from metagpt.actions import WriteCode, WriteCodeReview, WriteDesign, WriteTasks
from metagpt.const import WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.actions import WriteCode, WriteCodeReview, WriteTasks, WriteDesign
from metagpt.schema import Message
from metagpt.utils.common import CodeParser
from metagpt.utils.special_tokens import MSG_SEP, FILENAME_CODE_SEP
from metagpt.utils.special_tokens import FILENAME_CODE_SEP, MSG_SEP
async def gather_ordered_k(coros, k) -> list:
@ -49,7 +49,7 @@ async def gather_ordered_k(coros, k) -> list:
class Engineer(Role):
"""
Represents an Engineer role responsible for writing and possibly reviewing code.
Attributes:
name (str): Name of the engineer.
profile (str): Role profile, default is 'Engineer'.
@ -59,14 +59,16 @@ class Engineer(Role):
use_code_review (bool): Whether to use code review.
todos (list): List of tasks.
"""
def __init__(self,
name: str = "Alex",
profile: str = "Engineer",
goal: str = "Write elegant, readable, extensible, efficient code",
constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable",
n_borg: int = 1,
use_code_review: bool = False) -> None:
def __init__(
self,
name: str = "Alex",
profile: str = "Engineer",
goal: str = "Write elegant, readable, extensible, efficient code",
constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable",
n_borg: int = 1,
use_code_review: bool = False,
) -> None:
"""Initializes the Engineer role with given attributes."""
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteCode])
@ -90,13 +92,13 @@ class Engineer(Role):
@classmethod
def parse_workspace(cls, system_design_msg: Message) -> str:
if system_design_msg.instruct_content:
return system_design_msg.instruct_content.dict().get("Python package name").strip().strip("'").strip("\"")
return system_design_msg.instruct_content.dict().get("Python package name").strip().strip("'").strip('"')
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
def get_workspace(self) -> Path:
msg = self._rc.memory.get_by_action(WriteDesign)[-1]
if not msg:
return WORKSPACE_ROOT / 'src'
return WORKSPACE_ROOT / "src"
workspace = self.parse_workspace(msg)
# Codes are written in workspace/{package_name}/{package_name}
return WORKSPACE_ROOT / workspace / workspace
@ -111,7 +113,7 @@ class Engineer(Role):
def write_file(self, filename: str, code: str):
workspace = self.get_workspace()
filename = filename.replace('"', '').replace('\n', '')
filename = filename.replace('"', "").replace("\n", "")
file = workspace / filename
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text(code)
@ -127,8 +129,7 @@ class Engineer(Role):
todo_coros = []
for todo in self.todos:
todo_coro = WriteCode().run(
context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]),
filename=todo
context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]), filename=todo
)
todo_coros.append(todo_coro)
@ -142,17 +143,14 @@ class Engineer(Role):
self._rc.memory.add(msg)
del self.todos[0]
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f"Done {self.get_workspace()} generating.")
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp(self) -> Message:
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
for todo in self.todos:
code = await WriteCode().run(
context=self._rc.history,
filename=todo
)
code = await WriteCode().run(context=self._rc.history, filename=todo)
# logger.info(todo)
# logger.info(code_rsp)
# code = self.parse_code(code_rsp)
@ -163,17 +161,14 @@ class Engineer(Role):
code_msg = todo + FILENAME_CODE_SEP + str(file_path)
code_msg_all.append(code_msg)
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f"Done {self.get_workspace()} generating.")
msg = Message(
content=MSG_SEP.join(code_msg_all),
role=self.profile,
cause_by=type(self._rc.todo),
send_to="QaEngineer"
content=MSG_SEP.join(code_msg_all), role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer"
)
return msg
async def _act_sp_precision(self) -> Message:
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
code_msg_all = [] # gather all code info, will pass to qa_engineer for tests later
for todo in self.todos:
"""
# Select essential information from the historical data to reduce the length of the prompt (summarized from human experience):
@ -188,18 +183,11 @@ class Engineer(Role):
context.append(m.content)
context_str = "\n".join(context)
# Write code
code = await WriteCode().run(
context=context_str,
filename=todo
)
code = await WriteCode().run(context=context_str, filename=todo)
# Code review
if self.use_code_review:
try:
rewrite_code = await WriteCodeReview().run(
context=context_str,
code=code,
filename=todo
)
rewrite_code = await WriteCodeReview().run(context=context_str, code=code, filename=todo)
code = rewrite_code
except Exception as e:
logger.error("code review failed!", e)
@ -211,12 +199,9 @@ class Engineer(Role):
code_msg = todo + FILENAME_CODE_SEP + str(file_path)
code_msg_all.append(code_msg)
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f"Done {self.get_workspace()} generating.")
msg = Message(
content=MSG_SEP.join(code_msg_all),
role=self.profile,
cause_by=type(self._rc.todo),
send_to="QaEngineer"
content=MSG_SEP.join(code_msg_all), role=self.profile, cause_by=type(self._rc.todo), send_to="QaEngineer"
)
return msg
@ -224,4 +209,4 @@ class Engineer(Role):
"""Determines the mode of action based on whether code review is used."""
if self.use_code_review:
return await self._act_sp_precision()
return await self._act_sp()
return await self._act_sp()

View file

@ -12,22 +12,24 @@ from metagpt.roles import Role
class ProductManager(Role):
"""
Represents a Product Manager role responsible for product development and management.
Attributes:
name (str): Name of the product manager.
profile (str): Role profile, default is 'Product Manager'.
goal (str): Goal of the product manager.
constraints (str): Constraints or limitations for the product manager.
"""
def __init__(self,
name: str = "Alice",
profile: str = "Product Manager",
goal: str = "Efficiently create a successful product",
constraints: str = "") -> None:
def __init__(
self,
name: str = "Alice",
profile: str = "Product Manager",
goal: str = "Efficiently create a successful product",
constraints: str = "",
) -> None:
"""
Initializes the ProductManager role with given attributes.
Args:
name (str): Name of the product manager.
profile (str): Role profile.
@ -36,4 +38,4 @@ class ProductManager(Role):
"""
super().__init__(name, profile, goal, constraints)
self._init_actions([WritePRD])
self._watch([BossRequirement])
self._watch([BossRequirement])

View file

@ -5,29 +5,32 @@
@Author : alexanderwu
@File : project_manager.py
"""
from metagpt.actions import WriteDesign, WriteTasks
from metagpt.actions import WriteTasks
from metagpt.actions.design_api import WriteDesign
from metagpt.roles import Role
class ProjectManager(Role):
"""
Represents a Project Manager role responsible for overseeing project execution and team efficiency.
Attributes:
name (str): Name of the project manager.
profile (str): Role profile, default is 'Project Manager'.
goal (str): Goal of the project manager.
constraints (str): Constraints or limitations for the project manager.
"""
def __init__(self,
name: str = "Eve",
profile: str = "Project Manager",
goal: str = "Improve team efficiency and deliver with quality and quantity",
constraints: str = "") -> None:
def __init__(
self,
name: str = "Eve",
profile: str = "Project Manager",
goal: str = "Improve team efficiency and deliver with quality and quantity",
constraints: str = "",
) -> None:
"""
Initializes the ProjectManager role with given attributes.
Args:
name (str): Name of the project manager.
profile (str): Role profile.
@ -36,4 +39,4 @@ class ProjectManager(Role):
"""
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteTasks])
self._watch([WriteDesign])
self._watch([WriteDesign])

View file

@ -8,7 +8,14 @@
import os
from pathlib import Path
from metagpt.actions import DebugError, RunCode, WriteCode, WriteDesign, WriteTest
from metagpt.actions import (
DebugError,
RunCode,
WriteCode,
WriteCodeReview,
WriteDesign,
WriteTest,
)
from metagpt.const import WORKSPACE_ROOT
from metagpt.logs import logger
from metagpt.roles import Role
@ -30,13 +37,13 @@ class QaEngineer(Role):
self._init_actions(
[WriteTest]
) # FIXME: a bit hack here, only init one action to circumvent _think() logic, will overwrite _think() in future updates
self._watch([WriteCode, WriteTest, RunCode, DebugError])
self._watch([WriteCode, WriteCodeReview, WriteTest, RunCode, DebugError])
self.test_round = 0
self.test_round_allowed = test_round_allowed
@classmethod
def parse_workspace(cls, system_design_msg: Message) -> str:
if not system_design_msg.instruct_content:
if system_design_msg.instruct_content:
return system_design_msg.instruct_content.dict().get("Python package name")
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
@ -159,7 +166,7 @@ class QaEngineer(Role):
for msg in self._rc.news:
# Decide what to do based on observed msg type, currently defined by human,
# might potentially be moved to _think, that is, let the agent decides for itself
if msg.cause_by == WriteCode:
if msg.cause_by in [WriteCode, WriteCodeReview]:
# engineer wrote a code, time to write a test for it
await self._write_test(msg)
elif msg.cause_by in [WriteTest, DebugError]:

View file

@ -17,20 +17,19 @@ from metagpt.logs import logger
def check_cmd_exists(command) -> int:
""" 检查命令是否存在
"""检查命令是否存在
:param command: 待检查的命令
:return: 如果命令存在返回0如果不存在返回非0
"""
if platform.system().lower() == 'windows':
check_command = 'where ' + command
if platform.system().lower() == "windows":
check_command = "where " + command
else:
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
check_command = "command -v " + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
result = os.system(check_command)
return result
class OutputParser:
@classmethod
def parse_blocks(cls, text: str):
# 首先根据"##"将文本分割成不同的block
@ -54,7 +53,7 @@ class OutputParser:
@classmethod
def parse_code(cls, text: str, lang: str = "") -> str:
pattern = rf'```{lang}.*?\s+(.*?)```'
pattern = rf"```{lang}.*?\s+(.*?)```"
match = re.search(pattern, text, re.DOTALL)
if match:
code = match.group(1)
@ -65,13 +64,13 @@ class OutputParser:
@classmethod
def parse_str(cls, text: str):
text = text.split("=")[-1]
text = text.strip().strip("'").strip("\"")
text = text.strip().strip("'").strip('"')
return text
@classmethod
def parse_file_list(cls, text: str) -> list[str]:
# Regular expression pattern to find the tasks list.
pattern = r'\s*(.*=.*)?(\[.*\])'
pattern = r"\s*(.*=.*)?(\[.*\])"
# Extract tasks list string using regex.
match = re.search(pattern, text, re.DOTALL)
@ -83,12 +82,12 @@ class OutputParser:
else:
tasks = text.split("\n")
return tasks
@staticmethod
def parse_python_code(text: str) -> str:
for pattern in (
r'(.*?```python.*?\s+)?(?P<code>.*)(```.*?)',
r'(.*?```python.*?\s+)?(?P<code>.*)',
r"(.*?```python.*?\s+)?(?P<code>.*)(```.*?)",
r"(.*?```python.*?\s+)?(?P<code>.*)",
):
match = re.search(pattern, text, re.DOTALL)
if not match:
@ -135,7 +134,7 @@ class OutputParser:
typing = typing_define[0]
else:
typing = typing_define
if typing == List[str] or typing == List[Tuple[str, str]]:
if typing == List[str] or typing == List[Tuple[str, str]] or typing == List[List[str]]:
# 尝试解析list
try:
content = cls.parse_file_list(text=content)
@ -200,7 +199,6 @@ class OutputParser:
class CodeParser:
@classmethod
def parse_block(cls, block: str, text: str) -> str:
blocks = cls.parse_blocks(text)
@ -231,7 +229,7 @@ class CodeParser:
def parse_code(cls, block: str, text: str, lang: str = "") -> str:
if block:
text = cls.parse_block(block, text)
pattern = rf'```{lang}.*?\s+(.*?)```'
pattern = rf"```{lang}.*?\s+(.*?)```"
match = re.search(pattern, text, re.DOTALL)
if match:
code = match.group(1)
@ -246,7 +244,7 @@ class CodeParser:
def parse_str(cls, block: str, text: str, lang: str = ""):
code = cls.parse_code(block, text, lang)
code = code.split("=")[-1]
code = code.strip().strip("'").strip("\"")
code = code.strip().strip("'").strip('"')
return code
@classmethod
@ -254,7 +252,7 @@ class CodeParser:
# Regular expression pattern to find the tasks list.
code = cls.parse_code(block, text, lang)
# print(code)
pattern = r'\s*(.*=.*)?(\[.*\])'
pattern = r"\s*(.*=.*)?(\[.*\])"
# Extract tasks list string using regex.
match = re.search(pattern, code, re.DOTALL)
@ -277,7 +275,7 @@ class NoMoneyException(Exception):
super().__init__(self.message)
def __str__(self):
return f'{self.message} -> Amount required: {self.amount}'
return f"{self.message} -> Amount required: {self.amount}"
def print_members(module, indent=0):
@ -287,19 +285,19 @@ def print_members(module, indent=0):
:param indent:
:return:
"""
prefix = ' ' * indent
prefix = " " * indent
for name, obj in inspect.getmembers(module):
print(name, obj)
if inspect.isclass(obj):
print(f'{prefix}Class: {name}')
print(f"{prefix}Class: {name}")
# print the methods within the class
if name in ['__class__', '__base__']:
if name in ["__class__", "__base__"]:
continue
print_members(obj, indent + 2)
elif inspect.isfunction(obj):
print(f'{prefix}Function: {name}')
print(f"{prefix}Function: {name}")
elif inspect.ismethod(obj):
print(f'{prefix}Method: {name}')
print(f"{prefix}Method: {name}")
def parse_recipient(text):

View file

@ -0,0 +1,297 @@
import json
import re
from json import JSONDecodeError
from json.decoder import _decode_uXXXX
NUMBER_RE = re.compile(r"(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?", (re.VERBOSE | re.MULTILINE | re.DOTALL))
def py_make_scanner(context):
parse_object = context.parse_object
parse_array = context.parse_array
parse_string = context.parse_string
match_number = NUMBER_RE.match
strict = context.strict
parse_float = context.parse_float
parse_int = context.parse_int
parse_constant = context.parse_constant
object_hook = context.object_hook
object_pairs_hook = context.object_pairs_hook
memo = context.memo
def _scan_once(string, idx):
try:
nextchar = string[idx]
except IndexError:
raise StopIteration(idx) from None
if nextchar == '"' or nextchar == "'":
if idx + 2 < len(string) and string[idx + 1] == nextchar and string[idx + 2] == nextchar:
# Handle the case where the next two characters are the same as nextchar
return parse_string(string, idx + 3, strict, delimiter=nextchar * 3) # triple quote
else:
# Handle the case where the next two characters are not the same as nextchar
return parse_string(string, idx + 1, strict, delimiter=nextchar)
elif nextchar == "{":
return parse_object((string, idx + 1), strict, _scan_once, object_hook, object_pairs_hook, memo)
elif nextchar == "[":
return parse_array((string, idx + 1), _scan_once)
elif nextchar == "n" and string[idx : idx + 4] == "null":
return None, idx + 4
elif nextchar == "t" and string[idx : idx + 4] == "true":
return True, idx + 4
elif nextchar == "f" and string[idx : idx + 5] == "false":
return False, idx + 5
m = match_number(string, idx)
if m is not None:
integer, frac, exp = m.groups()
if frac or exp:
res = parse_float(integer + (frac or "") + (exp or ""))
else:
res = parse_int(integer)
return res, m.end()
elif nextchar == "N" and string[idx : idx + 3] == "NaN":
return parse_constant("NaN"), idx + 3
elif nextchar == "I" and string[idx : idx + 8] == "Infinity":
return parse_constant("Infinity"), idx + 8
elif nextchar == "-" and string[idx : idx + 9] == "-Infinity":
return parse_constant("-Infinity"), idx + 9
else:
raise StopIteration(idx)
def scan_once(string, idx):
try:
return _scan_once(string, idx)
finally:
memo.clear()
return scan_once
FLAGS = re.VERBOSE | re.MULTILINE | re.DOTALL
STRINGCHUNK = re.compile(r'(.*?)(["\\\x00-\x1f])', FLAGS)
STRINGCHUNK_SINGLEQUOTE = re.compile(r"(.*?)([\'\\\x00-\x1f])", FLAGS)
STRINGCHUNK_TRIPLE_DOUBLE_QUOTE = re.compile(r"(.*?)(\"\"\"|[\\\x00-\x1f])", FLAGS)
STRINGCHUNK_TRIPLE_SINGLEQUOTE = re.compile(r"(.*?)('''|[\\\x00-\x1f])", FLAGS)
BACKSLASH = {
'"': '"',
"\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t",
}
WHITESPACE = re.compile(r"[ \t\n\r]*", FLAGS)
WHITESPACE_STR = " \t\n\r"
def JSONObject(
s_and_end, strict, scan_once, object_hook, object_pairs_hook, memo=None, _w=WHITESPACE.match, _ws=WHITESPACE_STR
):
"""Parse a JSON object from a string and return the parsed object.
Args:
s_and_end (tuple): A tuple containing the input string to parse and the current index within the string.
strict (bool): If `True`, enforces strict JSON string decoding rules.
If `False`, allows literal control characters in the string. Defaults to `True`.
scan_once (callable): A function to scan and parse JSON values from the input string.
object_hook (callable): A function that, if specified, will be called with the parsed object as a dictionary.
object_pairs_hook (callable): A function that, if specified, will be called with the parsed object as a list of pairs.
memo (dict, optional): A dictionary used to memoize string keys for optimization. Defaults to None.
_w (function): A regular expression matching function for whitespace. Defaults to WHITESPACE.match.
_ws (str): A string containing whitespace characters. Defaults to WHITESPACE_STR.
Returns:
tuple or dict: A tuple containing the parsed object and the index of the character in the input string
after the end of the object.
"""
s, end = s_and_end
pairs = []
pairs_append = pairs.append
# Backwards compatibility
if memo is None:
memo = {}
memo_get = memo.setdefault
# Use a slice to prevent IndexError from being raised, the following
# check will raise a more specific ValueError if the string is empty
nextchar = s[end : end + 1]
# Normally we expect nextchar == '"'
if nextchar != '"' and nextchar != "'":
if nextchar in _ws:
end = _w(s, end).end()
nextchar = s[end : end + 1]
# Trivial empty object
if nextchar == "}":
if object_pairs_hook is not None:
result = object_pairs_hook(pairs)
return result, end + 1
pairs = {}
if object_hook is not None:
pairs = object_hook(pairs)
return pairs, end + 1
elif nextchar != '"':
raise JSONDecodeError("Expecting property name enclosed in double quotes", s, end)
end += 1
while True:
if end + 1 < len(s) and s[end] == nextchar and s[end + 1] == nextchar:
# Handle the case where the next two characters are the same as nextchar
key, end = scanstring(s, end + 2, strict, delimiter=nextchar * 3)
else:
# Handle the case where the next two characters are not the same as nextchar
key, end = scanstring(s, end, strict, delimiter=nextchar)
key = memo_get(key, key)
# To skip some function call overhead we optimize the fast paths where
# the JSON key separator is ": " or just ":".
if s[end : end + 1] != ":":
end = _w(s, end).end()
if s[end : end + 1] != ":":
raise JSONDecodeError("Expecting ':' delimiter", s, end)
end += 1
try:
if s[end] in _ws:
end += 1
if s[end] in _ws:
end = _w(s, end + 1).end()
except IndexError:
pass
try:
value, end = scan_once(s, end)
except StopIteration as err:
raise JSONDecodeError("Expecting value", s, err.value) from None
pairs_append((key, value))
try:
nextchar = s[end]
if nextchar in _ws:
end = _w(s, end + 1).end()
nextchar = s[end]
except IndexError:
nextchar = ""
end += 1
if nextchar == "}":
break
elif nextchar != ",":
raise JSONDecodeError("Expecting ',' delimiter", s, end - 1)
end = _w(s, end).end()
nextchar = s[end : end + 1]
end += 1
if nextchar != '"':
raise JSONDecodeError("Expecting property name enclosed in double quotes", s, end - 1)
if object_pairs_hook is not None:
result = object_pairs_hook(pairs)
return result, end
pairs = dict(pairs)
if object_hook is not None:
pairs = object_hook(pairs)
return pairs, end
def py_scanstring(s, end, strict=True, _b=BACKSLASH, _m=STRINGCHUNK.match, delimiter='"'):
"""Scan the string s for a JSON string.
Args:
s (str): The input string to be scanned for a JSON string.
end (int): The index of the character in `s` after the quote that started the JSON string.
strict (bool): If `True`, enforces strict JSON string decoding rules.
If `False`, allows literal control characters in the string. Defaults to `True`.
_b (dict): A dictionary containing escape sequence mappings.
_m (function): A regular expression matching function for string chunks.
delimiter (str): The string delimiter used to define the start and end of the JSON string.
Can be one of: '"', "'", '\"""', or "'''". Defaults to '"'.
Returns:
tuple: A tuple containing the decoded string and the index of the character in `s`
after the end quote.
"""
chunks = []
_append = chunks.append
begin = end - 1
if delimiter == '"':
_m = STRINGCHUNK.match
elif delimiter == "'":
_m = STRINGCHUNK_SINGLEQUOTE.match
elif delimiter == '"""':
_m = STRINGCHUNK_TRIPLE_DOUBLE_QUOTE.match
else:
_m = STRINGCHUNK_TRIPLE_SINGLEQUOTE.match
while 1:
chunk = _m(s, end)
if chunk is None:
raise JSONDecodeError("Unterminated string starting at", s, begin)
end = chunk.end()
content, terminator = chunk.groups()
# Content is contains zero or more unescaped string characters
if content:
_append(content)
# Terminator is the end of string, a literal control character,
# or a backslash denoting that an escape sequence follows
if terminator == delimiter:
break
elif terminator != "\\":
if strict:
# msg = "Invalid control character %r at" % (terminator,)
msg = "Invalid control character {0!r} at".format(terminator)
raise JSONDecodeError(msg, s, end)
else:
_append(terminator)
continue
try:
esc = s[end]
except IndexError:
raise JSONDecodeError("Unterminated string starting at", s, begin) from None
# If not a unicode escape sequence, must be in the lookup table
if esc != "u":
try:
char = _b[esc]
except KeyError:
msg = "Invalid \\escape: {0!r}".format(esc)
raise JSONDecodeError(msg, s, end)
end += 1
else:
uni = _decode_uXXXX(s, end)
end += 5
if 0xD800 <= uni <= 0xDBFF and s[end : end + 2] == "\\u":
uni2 = _decode_uXXXX(s, end + 1)
if 0xDC00 <= uni2 <= 0xDFFF:
uni = 0x10000 + (((uni - 0xD800) << 10) | (uni2 - 0xDC00))
end += 6
char = chr(uni)
_append(char)
return "".join(chunks), end
scanstring = py_scanstring
class CustomDecoder(json.JSONDecoder):
def __init__(
self,
*,
object_hook=None,
parse_float=None,
parse_int=None,
parse_constant=None,
strict=True,
object_pairs_hook=None
):
super().__init__(
object_hook=object_hook,
parse_float=parse_float,
parse_int=parse_int,
parse_constant=parse_constant,
strict=strict,
object_pairs_hook=object_pairs_hook,
)
self.parse_object = JSONObject
self.parse_string = py_scanstring
self.scan_once = py_make_scanner(self)
def decode(self, s, _w=json.decoder.WHITESPACE.match):
return super().decode(s)

View file

@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/19 20:39
@Author : femto Zheng
@File : get_template.py
"""
from metagpt.config import CONFIG
def get_template(templates, format=CONFIG.prompt_format):
selected_templates = templates.get(format)
if selected_templates is None:
raise ValueError(f"Can't find {format} in passed in templates")
# Extract the selected templates
prompt_template = selected_templates["PROMPT_TEMPLATE"]
format_example = selected_templates["FORMAT_EXAMPLE"]
return prompt_template, format_example

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/11 11:50
@Author : femto Zheng
@File : json_to_markdown.py
"""
# since we original write docs/*.md in markdown format, so I convert json back to markdown
def json_to_markdown(data, depth=2):
"""
Convert a JSON object to Markdown with headings for keys and lists for arrays, supporting nested objects.
Args:
data: JSON object (dictionary) or value.
depth (int): Current depth level for Markdown headings.
Returns:
str: Markdown representation of the JSON data.
"""
markdown = ""
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, list):
# Handle JSON arrays
markdown += "#" * depth + f" {key}\n\n"
items = [str(item) for item in value]
markdown += "- " + "\n- ".join(items) + "\n\n"
elif isinstance(value, dict):
# Handle nested JSON objects
markdown += "#" * depth + f" {key}\n\n"
markdown += json_to_markdown(value, depth + 1)
else:
# Handle other values
markdown += "#" * depth + f" {key}\n\n{value}\n\n"
else:
# Handle non-dictionary JSON data
markdown = str(data)
return markdown

View file

@ -38,6 +38,8 @@ typing-inspect==0.8.0
typing_extensions==4.5.0
libcst==1.0.1
qdrant-client==1.4.0
pytest-mock==3.11.1
open-interpreter==0.1.4; python_version>"3.9"
ta==0.10.2
semantic-kernel==0.3.10.dev0

View file

@ -9,6 +9,7 @@ import pytest
from metagpt.actions.design_api import WriteDesign
from metagpt.logs import logger
from metagpt.schema import Message
from tests.metagpt.actions.mock import PRD_SAMPLE
@ -18,9 +19,10 @@ async def test_design_api():
design_api = WriteDesign("design_api")
result = await design_api.run(prd)
result = await design_api.run([Message(content=prd, instruct_content=None)])
logger.info(result)
assert len(result) > 0
assert result
@pytest.mark.asyncio
@ -28,7 +30,7 @@ async def test_design_api_calculator():
prd = PRD_SAMPLE
design_api = WriteDesign("design_api")
result = await design_api.run(prd)
result = await design_api.run([Message(content=prd, instruct_content=None)])
logger.info(result)
assert len(result) > 10
assert result

View file

@ -31,7 +31,7 @@ async def test_write_test():
code_to_test=code,
test_file_name="test_food.py",
source_file_path="/some/dummy/path/cli_snake_game/cli_snake_game/food.py",
workspace="/some/dummy/path/cli_snake_game"
workspace="/some/dummy/path/cli_snake_game",
)
logger.info(test_code)
@ -40,3 +40,18 @@ async def test_write_test():
assert "from cli_snake_game.food import Food" in test_code
assert "class TestFood(unittest.TestCase)" in test_code
assert "def test_generate" in test_code
@pytest.mark.asyncio
async def test_write_code_invalid_code(mocker):
# Mock the _aask method to return an invalid code string
mocker.patch.object(WriteTest, "_aask", return_value="Invalid Code String")
# Create an instance of WriteTest
write_test = WriteTest()
# Call the write_code method
code = await write_test.write_code("Some prompt:")
# Assert that the returned code is the same as the invalid code string
assert code == "Invalid Code String"

View file

@ -18,4 +18,4 @@ async def test_product_manager():
rsp = await product_manager.handle(MockMessages.req)
logger.info(rsp)
assert len(rsp.content) > 0
assert "产品目标" in rsp.content
assert "Product Goals" in rsp.content

View file

@ -0,0 +1,72 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/8 11:38
@Author : femto Zheng
@File : test_custom_decoder.py
"""
from metagpt.utils.custom_decoder import CustomDecoder
def test_parse_single_quote():
# Create a custom JSON decoder
decoder = CustomDecoder(strict=False)
# Your provided input with single-quoted strings and line breaks
input_data = """{'a"
b':'"title": "Reach and engagement of campaigns",
"x-axis": "Low Reach --> High Reach",
"y-axis": "Low Engagement --> High Engagement",
"quadrant-1": "We should expand",
"quadrant-2": "Need to promote",
"quadrant-3": "Re-evaluate",
"quadrant-4": "May be improved",
"Campaign: A": [0.3, 0.6],
"Campaign B": [0.45, 0.23],
"Campaign C": [0.57, 0.69],
"Campaign D": [0.78, 0.34],
"Campaign E": [0.40, 0.34],
"Campaign F": [0.35, 0.78],
"Our Target Product": [0.5, 0.6]
'
}
"""
# Parse the JSON using the custom decoder
parsed_data = decoder.decode(input_data)
assert 'a"\n b' in parsed_data
def test_parse_triple_double_quote():
# Create a custom JSON decoder
decoder = CustomDecoder(strict=False)
# Your provided input with single-quoted strings and line breaks
input_data = '{"""a""":"b"}'
# Parse the JSON using the custom decoder
parsed_data = decoder.decode(input_data)
assert "a" in parsed_data
input_data = '{"""a""":"""b"""}'
# Parse the JSON using the custom decoder
parsed_data = decoder.decode(input_data)
assert parsed_data["a"] == "b"
def test_parse_triple_single_quote():
# Create a custom JSON decoder
decoder = CustomDecoder(strict=False)
# Your provided input with single-quoted strings and line breaks
input_data = "{'''a''':'b'}"
# Parse the JSON using the custom decoder
parsed_data = decoder.decode(input_data)
assert "a" in parsed_data
input_data = "{'''a''':'''b'''}"
# Parse the JSON using the custom decoder
parsed_data = decoder.decode(input_data)
assert parsed_data["a"] == "b"

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/11 11:53
@Author : femto Zheng
@File : test_json_to_markdown.py
"""
from metagpt.utils.json_to_markdown import json_to_markdown
def test_json_to_markdown():
# Example nested JSON data
json_data = {
"title": "Sample JSON to Markdown Conversion",
"description": "Convert JSON to Markdown with headings and lists.",
"tags": ["json", "markdown", "conversion"],
"content": {
"section1": {"subsection1": "This is a subsection.", "subsection2": "Another subsection."},
"section2": "This is the second section content.",
},
}
# Convert JSON to Markdown with nested sections
markdown_output = json_to_markdown(json_data)
expected = """## title
Sample JSON to Markdown Conversion
## description
Convert JSON to Markdown with headings and lists.
## tags
- json
- markdown
- conversion
## content
### section1
#### subsection1
This is a subsection.
#### subsection2
Another subsection.
### section2
This is the second section content.
"""
# Print or use the generated Markdown
# print(markdown_output)
assert expected == markdown_output