fix conflict

This commit is contained in:
femto 2023-09-19 13:27:52 +08:00
commit cf05287cff
33 changed files with 1069 additions and 145 deletions

View file

@ -1,5 +1,7 @@
#!/usr/bin/env python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/24 22:26
# @Author : alexanderwu
# @File : __init__.py
from metagpt import _compat as _ # noqa: F401

15
metagpt/_compat.py Normal file
View file

@ -0,0 +1,15 @@
import platform
import sys
import warnings
if sys.implementation.name == "cpython" and platform.system() == "Windows" and sys.version_info[:2] == (3, 9):
# https://github.com/python/cpython/pull/92842
from asyncio.proactor_events import _ProactorBasePipeTransport
def pacth_del(self, _warn=warnings.warn):
if self._sock is not None:
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
self._sock.close()
_ProactorBasePipeTransport.__del__ = pacth_del

View file

@ -92,30 +92,30 @@ class WriteDesign(Action):
pass # Folder does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
def _save_prd(self, docs_path, resources_path, context):
async def _save_prd(self, docs_path, resources_path, context):
prd_file = docs_path / "prd.md"
if context[-1].instruct_content and context[-1].instruct_content.dict()["Competitive Quadrant Chart"]:
quadrant_chart = context[-1].instruct_content.dict()["Competitive Quadrant Chart"]
mermaid_to_file(quadrant_chart, resources_path / "competitive_analysis")
await mermaid_to_file(quadrant_chart, resources_path / "competitive_analysis")
if context[-1].instruct_content:
logger.info(f"Saving PRD to {prd_file}")
prd_file.write_text(json_to_markdown(context[-1].instruct_content.dict()))
def _save_system_design(self, docs_path, resources_path, system_design):
async def _save_system_design(self, docs_path, resources_path, system_design):
data_api_design = system_design.instruct_content.dict()[
"Data structures and interface definitions"
] # CodeParser.parse_code(block="Data structures and interface definitions", text=content)
seq_flow = system_design.instruct_content.dict()[
"Program call flow"
] # CodeParser.parse_code(block="Program call flow", text=content)
mermaid_to_file(data_api_design, resources_path / "data_api_design")
mermaid_to_file(seq_flow, resources_path / "seq_flow")
await mermaid_to_file(data_api_design, resources_path / "data_api_design")
await mermaid_to_file(seq_flow, resources_path / "seq_flow")
system_design_file = docs_path / "system_design.md"
logger.info(f"Saving System Designs to {system_design_file}")
system_design_file.write_text((json_to_markdown(system_design.instruct_content.dict())))
def _save(self, context, system_design):
async def _save(self, context, system_design):
if isinstance(system_design, ActionOutput):
ws_name = system_design.instruct_content.dict()["Python package name"]
else:
@ -126,12 +126,12 @@ class WriteDesign(Action):
resources_path = workspace / "resources"
docs_path.mkdir(parents=True, exist_ok=True)
resources_path.mkdir(parents=True, exist_ok=True)
self._save_prd(docs_path, resources_path, context)
self._save_system_design(docs_path, resources_path, system_design)
await self._save_prd(docs_path, resources_path, context)
await self._save_system_design(docs_path, resources_path, system_design)
async def run(self, context):
prompt = PROMPT_TEMPLATE.format(context=context, format_example=FORMAT_EXAMPLE)
# system_design = await self._aask(prompt)
system_design = await self._aask_json_v1(prompt, "system_design", OUTPUT_MAPPING)
self._save(context, system_design)
await self._save(context, system_design)
return system_design

View file

@ -0,0 +1,95 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/9/4 15:40:40
@Author : Stitch-z
@File : tutorial_assistant.py
@Describe : Actions of the tutorial assistant, including writing directories and document content.
"""
import json
from typing import Dict
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.prompts.tutorial_assistant import DIRECTORY_PROMPT, CONTENT_PROMPT
class WriteDirectory(Action):
"""Action class for writing tutorial directories.
Args:
name: The name of the action.
language: The language to output, default is "Chinese".
"""
def __init__(self, name: str = "", language: str = "Chinese", *args, **kwargs):
super().__init__(name, *args, **kwargs)
self.language = language
@staticmethod
async def _handle_resp(resp: str) -> Dict:
"""Process string results and convert them to JSON format.
Args:
resp: The directory results returned by gpt.
Returns:
The parsed dictionary, such as {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}.
Raises:
Exception: If no matching dictionary section is found.
json.JSONDecodeError: If the dictionary part cannot be parsed as JSON.
"""
start = resp.find('{')
end = resp.rfind('}')
if start != -1 and end != -1 and end > start:
directory_str = resp[start:end + 1]
logger.info(f"Successfully parsed json: {str(directory_str)}")
try:
return json.loads(directory_str)
except json.JSONDecodeError as e:
logger.error(f"Json parsing error: {e}")
raise e
else:
raise Exception("No matching dictionary section found.")
async def run(self, topic: str, *args, **kwargs) -> Dict:
"""Execute the action to generate a tutorial directory according to the topic.
Args:
topic: The tutorial topic.
Returns:
the tutorial directory information, including {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}.
"""
prompt = DIRECTORY_PROMPT.format(topic=topic, language=self.language)
resp = await self._aask(prompt=prompt)
return await self._handle_resp(resp)
class WriteContent(Action):
"""Action class for writing tutorial content.
Args:
name: The name of the action.
directory: The content to write.
language: The language to output, default is "Chinese".
"""
def __init__(self, name: str = "", directory: str = "", language: str = "Chinese", *args, **kwargs):
super().__init__(name, *args, **kwargs)
self.language = language
self.directory = directory
async def run(self, topic: str, *args, **kwargs) -> str:
"""Execute the action to write document content according to the directory and topic.
Args:
topic: The tutorial topic.
Returns:
The written tutorial content.
"""
prompt = CONTENT_PROMPT.format(topic=topic, language=self.language, directory=self.directory)
return await self._aask(prompt=prompt)

View file

@ -59,6 +59,7 @@ class Config(metaclass=Singleton):
self.openai_api_rpm = self._get("RPM", 3)
self.openai_api_model = self._get("OPENAI_API_MODEL", "gpt-4")
self.max_tokens_rsp = self._get("MAX_TOKENS", 2048)
self.deployment_name = self._get('DEPLOYMENT_NAME')
self.deployment_id = self._get("DEPLOYMENT_ID")
self.claude_api_key = self._get("Anthropic_API_KEY")
@ -82,6 +83,8 @@ class Config(metaclass=Singleton):
self.calc_usage = self._get("CALC_USAGE", True)
self.model_for_researcher_summary = self._get("MODEL_FOR_RESEARCHER_SUMMARY")
self.model_for_researcher_report = self._get("MODEL_FOR_RESEARCHER_REPORT")
self.mermaid_engine = self._get("MERMAID_ENGINE", 'nodejs')
self.pyppeteer_executable_path = self._get("PYPPETEER_EXECUTABLE_PATH", '')
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
"""Load from config/key.yaml, config/config.yaml, and env in decreasing order of priority"""

View file

@ -33,5 +33,6 @@ API_QUESTIONS_PATH = UT_PATH / "files/question/"
YAPI_URL = "http://yapi.deepwisdomai.com/"
TMP = PROJECT_ROOT / 'tmp'
RESEARCH_PATH = DATA_PATH / "research"
TUTORIAL_PATH = DATA_PATH / "tutorial_docx"
MEM_TTL = 24 * 30 * 3600

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/9/4 15:40:40
@Author : Stitch-z
@File : tutorial_assistant.py
@Describe : Tutorial Assistant's prompt templates.
"""
COMMON_PROMPT = """
You are now a seasoned technical professional in the field of the internet.
We need you to write a technical tutorial with the topic "{topic}".
"""
DIRECTORY_PROMPT = COMMON_PROMPT + """
Please provide the specific table of contents for this tutorial, strictly following the following requirements:
1. The output must be strictly in the specified language, {language}.
2. Answer strictly in the dictionary format like {{"title": "xxx", "directory": [{{"dir 1": ["sub dir 1", "sub dir 2"]}}, {{"dir 2": ["sub dir 3", "sub dir 4"]}}]}}.
3. The directory should be as specific and sufficient as possible, with a primary and secondary directory.The secondary directory is in the array.
4. Do not have extra spaces or line breaks.
5. Each directory title has practical significance.
"""
CONTENT_PROMPT = COMMON_PROMPT + """
Now I will give you the module directory titles for the topic.
Please output the detailed principle content of this title in detail.
If there are code examples, please provide them according to standard code specifications.
Without a code example, it is not necessary.
The module directory titles for the topic is as follows:
{directory}
Strictly limit output according to the following requirements:
1. Follow the Markdown syntax format for layout.
2. If there are code examples, they must follow standard syntax specifications, have document annotations, and be displayed in code blocks.
3. The output must be strictly in the specified language, {language}.
4. Do not have redundant output, including concluding remarks.
5. Strict requirement not to output the topic "{topic}".
"""

View file

@ -6,11 +6,17 @@
"""
import asyncio
import time
from typing import NamedTuple
from typing import NamedTuple, Union
import openai
from openai.error import APIConnectionError
from tenacity import retry, stop_after_attempt, after_log, wait_fixed, retry_if_exception_type
from tenacity import (
after_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_fixed,
)
from metagpt.config import CONFIG
from metagpt.logs import logger
@ -48,12 +54,14 @@ class RateLimiter:
self.last_call_time = time.time()
class Costs(NamedTuple):
total_prompt_tokens: int
total_completion_tokens: int
total_cost: float
total_budget: float
class CostManager(metaclass=Singleton):
"""计算使用接口的开销"""
@ -74,7 +82,9 @@ class CostManager(metaclass=Singleton):
"""
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
cost = (prompt_tokens * TOKEN_COSTS[model]["prompt"] + completion_tokens * TOKEN_COSTS[model]["completion"]) / 1000
cost = (
prompt_tokens * TOKEN_COSTS[model]["prompt"] + completion_tokens * TOKEN_COSTS[model]["completion"]
) / 1000
self.total_cost += cost
logger.info(
f"Total running cost: ${self.total_cost:.3f} | Max budget: ${CONFIG.max_budget:.3f} | "
@ -100,6 +110,7 @@ class CostManager(metaclass=Singleton):
"""
return self.total_completion_tokens
def get_total_cost(self):
"""
Get the total cost of API calls.
@ -109,25 +120,20 @@ def get_total_cost(self):
"""
return self.total_cost
def get_costs(self) -> Costs:
"""Get all costs"""
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)
def log_and_reraise(retry_state):
logger.error(f"Retry attempts exhausted. Last exception: {retry_state.outcome.exception()}")
logger.warning("""
Recommend going to https://deepwisdom.feishu.cn/wiki/MsGnwQBjiif9c3koSJNcYaoSnu4#part-XdatdVlhEojeAfxaaEZcMV3ZniQ
See FAQ 5.8
""")
raise retry_state.outcome.exception()
def log_and_reraise(retry_state):
logger.error(f"Retry attempts exhausted. Last exception: {retry_state.outcome.exception()}")
logger.warning("""
logger.warning(
"""
Recommend going to https://deepwisdom.feishu.cn/wiki/MsGnwQBjiif9c3koSJNcYaoSnu4#part-XdatdVlhEojeAfxaaEZcMV3ZniQ
See FAQ 5.8
""")
"""
)
raise retry_state.outcome.exception()
@ -162,10 +168,12 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
chunk_message = chunk["choices"][0]["delta"] # extract the message
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
print(chunk_message["content"], end="")
choices = chunk["choices"]
if len(choices) > 0:
chunk_message = chunk["choices"][0].get("delta", {}) # extract the message
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
print(chunk_message["content"], end="")
print()
full_reply_content = "".join([m.get("content", "") for m in collected_messages])
@ -174,25 +182,27 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return full_reply_content
def _cons_kwargs(self, messages: list[dict]) -> dict:
kwargs = {
"messages": messages,
"max_tokens": self.get_max_tokens(messages),
"n": 1,
"stop": None,
"temperature": 0.3,
"timeout": 3,
}
if CONFIG.openai_api_type == "azure":
kwargs = {
"deployment_id": CONFIG.deployment_id,
"messages": messages,
"max_tokens": self.get_max_tokens(messages),
"n": 1,
"stop": None,
"temperature": 0.3,
}
if CONFIG.deployment_name and CONFIG.deployment_id:
raise ValueError("You can only use one of the `deployment_id` or `deployment_name` model")
elif not CONFIG.deployment_name and not CONFIG.deployment_id:
raise ValueError("You must specify `DEPLOYMENT_NAME` or `DEPLOYMENT_ID` parameter")
kwargs_mode = (
{"engine": CONFIG.deployment_name}
if CONFIG.deployment_name
else {"deployment_id": CONFIG.deployment_id}
)
else:
kwargs = {
"model": self.model,
"messages": messages,
"max_tokens": self.get_max_tokens(messages),
"n": 1,
"stop": None,
"temperature": 0.3,
}
kwargs["timeout"] = 3
kwargs_mode = {"model": self.model}
kwargs.update(kwargs_mode)
return kwargs
async def _achat_completion(self, messages: list[dict]) -> dict:
@ -218,7 +228,7 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
after=after_log(logger, logger.level('WARNING').name),
after=after_log(logger, logger.level("WARNING").name),
retry=retry_if_exception_type(APIConnectionError),
retry_error_callback=log_and_reraise,
)
@ -235,8 +245,8 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
try:
prompt_tokens = count_message_tokens(messages, self.model)
completion_tokens = count_string_tokens(rsp, self.model)
usage['prompt_tokens'] = prompt_tokens
usage['completion_tokens'] = completion_tokens
usage["prompt_tokens"] = prompt_tokens
usage["completion_tokens"] = completion_tokens
return usage
except Exception as e:
logger.error("usage calculation failed!", e)
@ -272,8 +282,8 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
def _update_costs(self, usage: dict):
if CONFIG.calc_usage:
try:
prompt_tokens = int(usage['prompt_tokens'])
completion_tokens = int(usage['completion_tokens'])
prompt_tokens = int(usage["prompt_tokens"])
completion_tokens = int(usage["completion_tokens"])
self._cost_manager.update_cost(prompt_tokens, completion_tokens, self.model)
except Exception as e:
logger.error("updating costs failed!", e)
@ -285,3 +295,31 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
if not self.auto_max_tokens:
return CONFIG.max_tokens_rsp
return get_max_completion_tokens(messages, self.model, CONFIG.max_tokens_rsp)
def moderation(self, content: Union[str, list[str]]):
try:
if not content:
logger.error("content cannot be empty!")
else:
rsp = self._moderation(content=content)
return rsp
except Exception as e:
logger.error(f"moderating failed:{e}")
def _moderation(self, content: Union[str, list[str]]):
rsp = self.llm.Moderation.create(input=content)
return rsp
async def amoderation(self, content: Union[str, list[str]]):
try:
if not content:
logger.error("content cannot be empty!")
else:
rsp = await self._amoderation(content=content)
return rsp
except Exception as e:
logger.error(f"moderating failed:{e}")
async def _amoderation(self, content: Union[str, list[str]]):
rsp = await self.llm.Moderation.acreate(input=content)
return rsp

View file

@ -32,7 +32,7 @@ class PromptString(Enum):
RECENT_ACTIVITY = "Based on the following memory, produce a brief summary of what {full_name} has been up to recently. Do not invent details not explicitly stated in the memory. For any conversation, be sure to mention whether the conversation has concluded or is still ongoing.\n\nMemory: {memory_descriptions}"
MAKE_PLANS = 'You are a plan-generating AI. Your job is to assist the character in formulating new plans based on new information. Given the character's information (profile, objectives, recent activities, current plans, and location context) and their current thought process, produce a new set of plans for them. The final plan should comprise at least {time_window} of activities and no more than 5 individual plans. List the plans in the order they should be executed, with each plan detailing its description, location, start time, stop criteria, and maximum duration.\n\nSample plan: \'{{"index": 1, "description": "Cook dinner", "location_id": "0a3bc22b-36aa-48ab-adb0-18616004caed","start_time": "2022-12-12T20:00:00+00:00","max_duration_hrs": 1.5, "stop_condition": "Dinner is fully prepared"}}\'\n\nFor each plan, choose the most appropriate location name from this list: {allowed_location_descriptions}\n\n{format_instructions}\n\nAlways prioritize completing any unfinished conversations.\n\nLet's begin!\n\nName: {full_name}\nProfile: {private_bio}\nObjectives: {directives}\nLocation Context: {location_context}\nCurrent Plans: {current_plans}\nRecent Activities: {recent_activity}\nThought Process: {thought_process}\nIt's essential to encourage the character to collaborate with other characters in their plans.\n\n'
MAKE_PLANS = "You are a plan-generating AI. Your job is to assist the character in formulating new plans based on new information. Given the character's information (profile, objectives, recent activities, current plans, and location context) and their current thought process, produce a new set of plans for them. The final plan should comprise at least {time_window} of activities and no more than 5 individual plans. List the plans in the order they should be executed, with each plan detailing its description, location, start time, stop criteria, and maximum duration.\n\nSample plan: {{\"index\": 1, \"description\": \"Cook dinner\", \"location_id\": \"0a3bc22b-36aa-48ab-adb0-18616004caed\",\"start_time\": \"2022-12-12T20:00:00+00:00\",\"max_duration_hrs\": 1.5, \"stop_condition\": \"Dinner is fully prepared\"}}\'\n\nFor each plan, choose the most appropriate location name from this list: {allowed_location_descriptions}\n\n{format_instructions}\n\nAlways prioritize completing any unfinished conversations.\n\nLet's begin!\n\nName: {full_name}\nProfile: {private_bio}\nObjectives: {directives}\nLocation Context: {location_context}\nCurrent Plans: {current_plans}\nRecent Activities: {recent_activity}\nThought Process: {thought_process}\nIt's essential to encourage the character to collaborate with other characters in their plans.\n\n"
EXECUTE_PLAN = "You are a role-playing AI, playing the role of {your_name}, in front of a live audience. Every word you say can be observed by the audience, so make sure you speak often and make it interesting. You cannot interact directly with the audience.\n\nBased on the following context and tools, proceed as if you are {your_name}. Your primary task is to complete the task given below, however, if you are currently in a conversation with another character, you should always complete the conversation first and then proceed with the task. Do not start working while you are engaged in an unfinished conversation. Use your best judgment to determine whether a conversation involves you and whether it is unfinished. You don't need to respond to every message you receive.\n\nThis is a personal profile of your character:\n{your_private_bio}\n\nThis is a description of your location and other nearby characters you can converse with:\n\n{location_context}\n\nThese are some relevant memories:\n```\n{relevant_memories}\n```\n\nThis is some relevant conversation history:\n```\n{conversation_history}\n```\n\nThese are the tools you can use:\n{tools}\n\nYour response should follow the format below:\n\nTask: The task you must complete\nThoughts: What you should think about doing\nAction: The action to take, must be one of these words: [{tool_names}]\nAction Input: Input for the action\nObservation: The result of the action\n... (This Thoughts/Action/Action Input/Observation can be repeated N times)\nThoughts: 'I have completed the task'\nFinal Response: The final response to the task\n\nIf you are not ready with a final response, then you must take an action.\n\nIf you are certain that you cannot complete the task with the tools provided, return 'Final Response: Need Help', however, if you are in conversation with another character, responses like 'I don't know' are valid. During conversations, you should never break character or admit you are an AI.\nIf the task is completed and no specific response is required, return 'Final Response: Done'\nLet's begin!\n\nTask: {input}\n\n{agent_scratchpad}"

View file

@ -0,0 +1,114 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/9/4 15:40:40
@Author : Stitch-z
@File : tutorial_assistant.py
"""
from datetime import datetime
from typing import Dict
from metagpt.actions.write_tutorial import WriteDirectory, WriteContent
from metagpt.const import TUTORIAL_PATH
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.file import File
class TutorialAssistant(Role):
"""Tutorial assistant, input one sentence to generate a tutorial document in markup format.
Args:
name: The name of the role.
profile: The role profile description.
goal: The goal of the role.
constraints: Constraints or requirements for the role.
language: The language in which the tutorial documents will be generated.
"""
def __init__(
self,
name: str = "Stitch",
profile: str = "Tutorial Assistant",
goal: str = "Generate tutorial documents",
constraints: str = "Strictly follow Markdown's syntax, with neat and standardized layout",
language: str = "Chinese",
):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteDirectory(language=language)])
self.topic = ""
self.main_title = ""
self.total_content = ""
self.language = language
async def _think(self) -> None:
"""Determine the next action to be taken by the role."""
if self._rc.todo is None:
self._set_state(0)
return
if self._rc.state + 1 < len(self._states):
self._set_state(self._rc.state + 1)
else:
self._rc.todo = None
async def _handle_directory(self, titles: Dict) -> Message:
"""Handle the directories for the tutorial document.
Args:
titles: A dictionary containing the titles and directory structure,
such as {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}
Returns:
A message containing information about the directory.
"""
self.main_title = titles.get("title")
directory = f"{self.main_title}\n"
self.total_content += f"# {self.main_title}"
actions = list()
for first_dir in titles.get("directory"):
actions.append(WriteContent(language=self.language, directory=first_dir))
key = list(first_dir.keys())[0]
directory += f"- {key}\n"
for second_dir in first_dir[key]:
directory += f" - {second_dir}\n"
self._init_actions(actions)
self._rc.todo = None
return Message(content=directory)
async def _act(self) -> Message:
"""Perform an action as determined by the role.
Returns:
A message containing the result of the action.
"""
todo = self._rc.todo
if type(todo) is WriteDirectory:
msg = self._rc.memory.get(k=1)[0]
self.topic = msg.content
resp = await todo.run(topic=self.topic)
logger.info(resp)
return await self._handle_directory(resp)
resp = await todo.run(topic=self.topic)
logger.info(resp)
if self.total_content != "":
self.total_content += "\n\n\n"
self.total_content += resp
return Message(content=resp, role=self.profile)
async def _react(self) -> Message:
"""Execute the assistant's think and actions.
Returns:
A message containing the final result of the assistant's actions.
"""
while True:
await self._think()
if self._rc.todo is None:
break
msg = await self._act()
root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
await File.write(root_path, f"{self.main_title}.md", self.total_content.encode('utf-8'))
return msg

View file

@ -60,6 +60,7 @@ def test_node_tags(project_key, nodes, operations, expected_msg):
# 3. If comments are needed, use Chinese.
# If you understand, please wait for me to give the interface definition and just answer "Understood" to save tokens.
'''
ACT_PROMPT_PREFIX = '''Refer to the test types: such as missing request parameters, field boundary verification, incorrect field type.
Please output 10 test cases within one `@pytest.mark.parametrize` scope.
@ -94,7 +95,8 @@ Name Type Required Default Value Remarks
code integer Yes
message string Yes
data object Yes
```
'''
class UTGenerator:

View file

@ -9,6 +9,7 @@ import ast
import contextlib
import inspect
import os
import platform
import re
from typing import List, Tuple
@ -20,7 +21,10 @@ def check_cmd_exists(command) -> int:
:param command: 待检查的命令
:return: 如果命令存在返回0如果不存在返回非0
"""
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
if platform.system().lower() == 'windows':
check_command = 'where ' + command
else:
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
result = os.system(check_command)
return result
@ -187,7 +191,8 @@ class CodeParser:
else:
logger.error(f"{pattern} not match following text:")
logger.error(text)
raise Exception
# raise Exception
return ""
return code
@classmethod

42
metagpt/utils/file.py Normal file
View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
# _*_ coding: utf-8 _*_
"""
@Time : 2023/9/4 15:40:40
@Author : Stitch-z
@File : file.py
@Describe : General file operations.
"""
import aiofiles
from pathlib import Path
from metagpt.logs import logger
class File:
"""A general util for file operations."""
@classmethod
async def write(cls, root_path: Path, filename: str, content: bytes) -> Path:
"""Write the file content to the local specified path.
Args:
root_path: The root path of file, such as "/data".
filename: The name of file, such as "test.txt".
content: The binary content of file.
Returns:
The full filename of file, such as "/data/test.txt".
Raises:
Exception: If an unexpected error occurs during the file writing process.
"""
try:
root_path.mkdir(parents=True, exist_ok=True)
full_path = root_path / filename
async with aiofiles.open(full_path, mode="wb") as writer:
await writer.write(content)
logger.info(f"Successfully write file: {full_path}")
return full_path
except Exception as e:
logger.error(f"Error writing file: {e}")
raise e

1
metagpt/utils/index.html Normal file

File diff suppressed because one or more lines are too long

View file

@ -2,9 +2,10 @@
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/4 10:53
@Author : alexanderwu
@Author : alexanderwu alitrack
@File : mermaid.py
"""
import asyncio
import subprocess
from pathlib import Path
@ -12,48 +13,76 @@ from metagpt.config import CONFIG
from metagpt.const import PROJECT_ROOT
from metagpt.logs import logger
from metagpt.utils.common import check_cmd_exists
import os
import sys
def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048) -> int:
async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048) -> int:
"""suffix: png/svg/pdf
:param mermaid_code: mermaid code
:param output_file_without_suffix: output filename
:param width:
:param height:
:return: 0 if succed, -1 if failed
:return: 0 if succeed, -1 if failed
"""
# Write the Mermaid code to a temporary file
dir_name = os.path.dirname(output_file_without_suffix)
if dir_name and not os.path.exists(dir_name):
os.makedirs(dir_name)
tmp = Path(f"{output_file_without_suffix}.mmd")
tmp.write_text(mermaid_code, encoding="utf-8")
engine = CONFIG.mermaid_engine.lower()
if engine == "nodejs":
if check_cmd_exists("mmdc") != 0:
logger.warning("RUN `npm install -g @mermaid-js/mermaid-cli` to install mmdc")
return -1
for suffix in ["pdf", "svg", "png"]:
output_file = f"{output_file_without_suffix}.{suffix}"
# Call the `mmdc` command to convert the Mermaid code to a PNG
logger.info(f"Generating {output_file}..")
if check_cmd_exists("mmdc") != 0:
logger.warning("RUN `npm install -g @mermaid-js/mermaid-cli` to install mmdc")
return -1
for suffix in ["pdf", "svg", "png"]:
output_file = f"{output_file_without_suffix}.{suffix}"
# Call the `mmdc` command to convert the Mermaid code to a PNG
logger.info(f"Generating {output_file}..")
if CONFIG.puppeteer_config:
subprocess.run(
[
CONFIG.mmdc,
"-p",
CONFIG.puppeteer_config,
"-i",
str(tmp),
"-o",
output_file,
"-w",
str(width),
"-H",
str(height),
]
if CONFIG.puppeteer_config:
commands =[
CONFIG.mmdc,
"-p",
CONFIG.puppeteer_config,
"-i",
str(tmp),
"-o",
output_file,
"-w",
str(width),
"-H",
str(height),
]
else:
commands =[CONFIG.mmdc, "-i", str(tmp), "-o", output_file, "-w", str(width), "-H", str(height)]
process = await asyncio.create_subprocess_exec(
*commands,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if stdout:
logger.info(stdout.decode())
if stderr:
logger.error(stderr.decode())
else:
if engine =='playwright':
from metagpt.utils.mmdc_playwright import mermaid_to_file
return await mermaid_to_file(mermaid_code, output_file_without_suffix, width, height)
elif engine =='pyppeteer':
from metagpt.utils.mmdc_pyppeteer import mermaid_to_file
return await mermaid_to_file(mermaid_code, output_file_without_suffix, width, height)
elif engine =='ink':
from metagpt.utils.mmdc_ink import mermaid_to_file
return await mermaid_to_file(mermaid_code, output_file_without_suffix)
else:
subprocess.run([CONFIG.mmdc, "-i", str(tmp), "-o", output_file, "-w", str(width), "-H", str(height)])
logger.warning(f"Unsupported mermaid engine: {engine}")
return 0
@ -108,7 +137,9 @@ MMC2 = """sequenceDiagram
SE-->>M: return summary"""
if __name__ == "__main__":
# logger.info(print_members(print_members))
mermaid_to_file(MMC1, PROJECT_ROOT / "tmp/1.png")
mermaid_to_file(MMC2, PROJECT_ROOT / "tmp/2.png")
loop = asyncio.new_event_loop()
result = loop.run_until_complete(mermaid_to_file(MMC1, PROJECT_ROOT / f"{CONFIG.mermaid_engine}/1"))
result = loop.run_until_complete(mermaid_to_file(MMC2, PROJECT_ROOT / f"{CONFIG.mermaid_engine}/1"))
loop.close()

41
metagpt/utils/mmdc_ink.py Normal file
View file

@ -0,0 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/4 16:12
@Author : alitrack
@File : mermaid.py
"""
import base64
import os
from aiohttp import ClientSession,ClientError
from metagpt.logs import logger
async def mermaid_to_file(mermaid_code, output_file_without_suffix):
"""suffix: png/svg
:param mermaid_code: mermaid code
:param output_file_without_suffix: output filename without suffix
:return: 0 if succeed, -1 if failed
"""
encoded_string = base64.b64encode(mermaid_code.encode()).decode()
for suffix in ["svg", "png"]:
output_file = f"{output_file_without_suffix}.{suffix}"
path_type = "svg" if suffix == "svg" else "img"
url = f"https://mermaid.ink/{path_type}/{encoded_string}"
async with ClientSession() as session:
try:
async with session.get(url) as response:
if response.status == 200:
text = await response.content.read()
with open(output_file, 'wb') as f:
f.write(text)
logger.info(f"Generating {output_file}..")
else:
logger.error(f"Failed to generate {output_file}")
return -1
except ClientError as e:
logger.error(f"network error: {e}")
return -1
return 0

View file

@ -0,0 +1,111 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/4 16:12
@Author : Steven Lee
@File : mmdc_playwright.py
"""
import os
from urllib.parse import urljoin
from playwright.async_api import async_playwright
from metagpt.logs import logger
async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048)-> int:
"""
Converts the given Mermaid code to various output formats and saves them to files.
Args:
mermaid_code (str): The Mermaid code to convert.
output_file_without_suffix (str): The output file name without the file extension.
width (int, optional): The width of the output image in pixels. Defaults to 2048.
height (int, optional): The height of the output image in pixels. Defaults to 2048.
Returns:
int: Returns 1 if the conversion and saving were successful, -1 otherwise.
"""
suffixes=['png', 'svg', 'pdf']
__dirname = os.path.dirname(os.path.abspath(__file__))
async with async_playwright() as p:
browser = await p.chromium.launch()
device_scale_factor = 1.0
context = await browser.new_context(
viewport={'width': width, 'height': height},
device_scale_factor=device_scale_factor,
)
page = await context.new_page()
async def console_message(msg):
logger.info(msg.text)
page.on('console', console_message)
try:
await page.set_viewport_size({'width': width, 'height': height})
mermaid_html_path = os.path.abspath(
os.path.join(__dirname, 'index.html'))
mermaid_html_url = urljoin('file:', mermaid_html_path)
await page.goto(mermaid_html_url)
await page.wait_for_load_state("networkidle")
await page.wait_for_selector("div#container", state="attached")
mermaid_config = {}
background_color = "#ffffff"
my_css = ""
await page.evaluate(f'document.body.style.background = "{background_color}";')
metadata = await page.evaluate('''async ([definition, mermaidConfig, myCSS, backgroundColor]) => {
const { mermaid, zenuml } = globalThis;
await mermaid.registerExternalDiagrams([zenuml]);
mermaid.initialize({ startOnLoad: false, ...mermaidConfig });
const { svg } = await mermaid.render('my-svg', definition, document.getElementById('container'));
document.getElementById('container').innerHTML = svg;
const svgElement = document.querySelector('svg');
svgElement.style.backgroundColor = backgroundColor;
if (myCSS) {
const style = document.createElementNS('http://www.w3.org/2000/svg', 'style');
style.appendChild(document.createTextNode(myCSS));
svgElement.appendChild(style);
}
}''', [mermaid_code, mermaid_config, my_css, background_color])
if 'svg' in suffixes :
svg_xml = await page.evaluate('''() => {
const svg = document.querySelector('svg');
const xmlSerializer = new XMLSerializer();
return xmlSerializer.serializeToString(svg);
}''')
logger.info(f"Generating {output_file_without_suffix}.svg..")
with open(f'{output_file_without_suffix}.svg', 'wb') as f:
f.write(svg_xml.encode('utf-8'))
if 'png' in suffixes:
clip = await page.evaluate('''() => {
const svg = document.querySelector('svg');
const rect = svg.getBoundingClientRect();
return {
x: Math.floor(rect.left),
y: Math.floor(rect.top),
width: Math.ceil(rect.width),
height: Math.ceil(rect.height)
};
}''')
await page.set_viewport_size({'width': clip['x'] + clip['width'], 'height': clip['y'] + clip['height']})
screenshot = await page.screenshot(clip=clip, omit_background=True, scale='device')
logger.info(f"Generating {output_file_without_suffix}.png..")
with open(f'{output_file_without_suffix}.png', 'wb') as f:
f.write(screenshot)
if 'pdf' in suffixes:
pdf_data = await page.pdf(scale=device_scale_factor)
logger.info(f"Generating {output_file_without_suffix}.pdf..")
with open(f'{output_file_without_suffix}.pdf', 'wb') as f:
f.write(pdf_data)
return 0
except Exception as e:
logger.error(e)
return -1
finally:
await browser.close()

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/4 16:12
@Author : alitrack
@File : mmdc_pyppeteer.py
"""
import os
from urllib.parse import urljoin
from pyppeteer import launch
from metagpt.logs import logger
from metagpt.config import CONFIG
async def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048)-> int:
"""
Converts the given Mermaid code to various output formats and saves them to files.
Args:
mermaid_code (str): The Mermaid code to convert.
output_file_without_suffix (str): The output file name without the file extension.
width (int, optional): The width of the output image in pixels. Defaults to 2048.
height (int, optional): The height of the output image in pixels. Defaults to 2048.
Returns:
int: Returns 1 if the conversion and saving were successful, -1 otherwise.
"""
suffixes = ['png', 'svg', 'pdf']
__dirname = os.path.dirname(os.path.abspath(__file__))
if CONFIG.pyppeteer_executable_path:
browser = await launch(headless=True,
executablePath=CONFIG.pyppeteer_executable_path,
args=['--disable-extensions',"--no-sandbox"]
)
else:
logger.error("Please set the environment variable:PYPPETEER_EXECUTABLE_PATH.")
return -1
page = await browser.newPage()
device_scale_factor = 1.0
async def console_message(msg):
logger.info(msg.text)
page.on('console', console_message)
try:
await page.setViewport(viewport={'width': width, 'height': height, 'deviceScaleFactor': device_scale_factor})
mermaid_html_path = os.path.abspath(
os.path.join(__dirname, 'index.html'))
mermaid_html_url = urljoin('file:', mermaid_html_path)
await page.goto(mermaid_html_url)
await page.querySelector("div#container")
mermaid_config = {}
background_color = "#ffffff"
my_css = ""
await page.evaluate(f'document.body.style.background = "{background_color}";')
metadata = await page.evaluate('''async ([definition, mermaidConfig, myCSS, backgroundColor]) => {
const { mermaid, zenuml } = globalThis;
await mermaid.registerExternalDiagrams([zenuml]);
mermaid.initialize({ startOnLoad: false, ...mermaidConfig });
const { svg } = await mermaid.render('my-svg', definition, document.getElementById('container'));
document.getElementById('container').innerHTML = svg;
const svgElement = document.querySelector('svg');
svgElement.style.backgroundColor = backgroundColor;
if (myCSS) {
const style = document.createElementNS('http://www.w3.org/2000/svg', 'style');
style.appendChild(document.createTextNode(myCSS));
svgElement.appendChild(style);
}
}''', [mermaid_code, mermaid_config, my_css, background_color])
if 'svg' in suffixes :
svg_xml = await page.evaluate('''() => {
const svg = document.querySelector('svg');
const xmlSerializer = new XMLSerializer();
return xmlSerializer.serializeToString(svg);
}''')
logger.info(f"Generating {output_file_without_suffix}.svg..")
with open(f'{output_file_without_suffix}.svg', 'wb') as f:
f.write(svg_xml.encode('utf-8'))
if 'png' in suffixes:
clip = await page.evaluate('''() => {
const svg = document.querySelector('svg');
const rect = svg.getBoundingClientRect();
return {
x: Math.floor(rect.left),
y: Math.floor(rect.top),
width: Math.ceil(rect.width),
height: Math.ceil(rect.height)
};
}''')
await page.setViewport({'width': clip['x'] + clip['width'], 'height': clip['y'] + clip['height'], 'deviceScaleFactor': device_scale_factor})
screenshot = await page.screenshot(clip=clip, omit_background=True, scale='device')
logger.info(f"Generating {output_file_without_suffix}.png..")
with open(f'{output_file_without_suffix}.png', 'wb') as f:
f.write(screenshot)
if 'pdf' in suffixes:
pdf_data = await page.pdf(scale=device_scale_factor)
logger.info(f"Generating {output_file_without_suffix}.pdf..")
with open(f'{output_file_without_suffix}.pdf', 'wb') as f:
f.write(pdf_data)
return 0
except Exception as e:
logger.error(e)
return -1
finally:
await browser.close()