English Translation

translated chinese to english
This commit is contained in:
brucemeek 2023-07-26 14:46:20 -05:00
parent 8cc8b80e49
commit f2725b18ad
24 changed files with 351 additions and 402 deletions

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
提供配置单例
Provides configuration, singleton pattern.
"""
import os
import openai
@ -28,7 +28,7 @@ class NotConfiguredException(Exception):
class Config(metaclass=Singleton):
"""
常规使用方法
Regular usage:
config = Config("config.yaml")
secret_key = config.get_key("MY_SECRET_KEY")
print("Secret key:", secret_key)
@ -79,14 +79,14 @@ class Config(metaclass=Singleton):
self.total_cost = 0.0
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
"""从config/key.yaml / config/config.yaml / env三处按优先级递减加载"""
"""Load from config/key.yaml / config/config.yaml / env in decreasing order of priority."""
configs.update(os.environ)
for _yaml_file in [yaml_file, self.key_yaml_file]:
if not _yaml_file.exists():
continue
# 加载本地 YAML 文件
# Load the local YAML file
with open(_yaml_file, "r", encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
if not yaml_data:
@ -98,7 +98,7 @@ class Config(metaclass=Singleton):
return self._configs.get(*args, **kwargs)
def get(self, key, *args, **kwargs):
"""从config/key.yaml / config/config.yaml / env三处找值找不到报错"""
"""Fetch value from config/key.yaml / config/config.yaml / env, raise an error if not found."""
value = self._get(key, *args, **kwargs)
if value is None:
raise ValueError(f"Key '{key}' not found in environment variables or in the YAML file")

View file

@ -9,7 +9,7 @@ from pathlib import Path
def get_project_root():
"""逐级向上寻找项目根目录"""
"""Search upwards to find the project root directory."""
current_path = Path.cwd()
while True:
if (current_path / '.git').exists() or \

View file

@ -28,7 +28,7 @@ class FaissStore(LocalStore):
def _load(self) -> Optional["FaissStore"]:
index_file, store_file = self._get_index_and_store_fname()
if not (index_file.exists() and store_file.exists()):
logger.info("Missing at least one of index_file/store_file, load failed and return None")
logger.info("At least one of the index_file/store_file is missing. Loading failed and returns None.")
return None
index = faiss.read_index(str(index_file))
with open(str(store_file), "rb") as f:
@ -59,7 +59,7 @@ class FaissStore(LocalStore):
return str(sep.join([f"{x.page_content}" for x in rsp]))
def write(self):
"""根据用户给定的DocumentJSON / XLSX等文件进行index与库的初始化"""
"""Initialize the index and library based on the provided Document (JSON / XLSX, etc.) file."""
if not self.raw_data.exists():
raise FileNotFoundError
doc = Document(self.raw_data, self.content_col, self.meta_col)
@ -69,16 +69,16 @@ class FaissStore(LocalStore):
self.persist()
def add(self, texts: list[str], *args, **kwargs) -> list[str]:
"""FIXME: 目前add之后没有更新store"""
"""FIXME: The store isn't currently updated after adding."""
return self.store.add_texts(texts)
def delete(self, *args, **kwargs):
"""目前langchain没有提供del接口"""
"""Currently, langchain doesn't provide a delete interface."""
raise NotImplementedError
if __name__ == '__main__':
faiss_store = FaissStore(DATA_PATH / 'qcs/qcs_4w.json')
logger.info(faiss_store.search('油皮洗面奶'))
faiss_store.add([f'油皮洗面奶-{i}' for i in range(3)])
logger.info(faiss_store.search('油皮洗面奶'))
logger.info(faiss_store.search('Oily skin facial cleanser'))
faiss_store.add([f'Oily skin facial cleanser-{i}' for i in range(3)])
logger.info(faiss_store.search('Oily skin facial cleanser'))

View file

@ -21,7 +21,7 @@ type_mapping = {
def columns_to_milvus_schema(columns: dict, primary_col_name: str = "", desc: str = ""):
"""这里假设columns结构是str: 常规类型"""
"""Assuming the structure of columns is str: regular type."""
fields = []
for col, ctype in columns.items():
if ctype == str:
@ -79,8 +79,8 @@ class MilvusStore(BaseStore):
"""
FIXME: ADD TESTS
https://milvus.io/docs/v2.0.x/search.md
All search and query operations within Milvus are executed in memory. Load the collection to memory before conducting a vector similarity search.
注意到上述描述这个逻辑是认真的吗这个耗时应该很长
All search and query operations within Milvus are executed in memory. Load the collection into memory before conducting a vector similarity search.
Noting the above description, is this logic serious? This should take a long time, right?
"""
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = self.collection.search(
@ -91,7 +91,7 @@ class MilvusStore(BaseStore):
expr=None,
consistency_level="Strong"
)
# FIXME: results里有id但是id到实际值还得调用query接口来获取
# FIXME: results contain ids, but to get the actual value from the id, you have to call the query interface
return results
def write(self, name, schema, *args, **kwargs):

View file

@ -16,7 +16,7 @@ from metagpt.schema import Message
class Environment(BaseModel):
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到"""
"""Environment that hosts a set of roles. Roles can publish messages to the environment, which can be observed by other roles."""
roles: dict[str, Role] = Field(default_factory=dict)
memory: Memory = Field(default_factory=Memory)
@ -26,23 +26,23 @@ class Environment(BaseModel):
arbitrary_types_allowed = True
def add_role(self, role: Role):
"""增加一个在当前环境的Role"""
"""Add a role to the current environment."""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的Role"""
"""Add multiple roles to the current environment."""
for role in roles:
self.add_role(role)
def publish_message(self, message: Message):
"""向当前环境发布信息"""
"""Publish a message to the current environment."""
# self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
async def run(self, k=1):
"""处理一次所有Role的运行"""
"""Execute a single run for all roles in the environment."""
# while not self.message_queue.empty():
# message = self.message_queue.get()
# rsp = await self.manager.handle(message, self)
@ -56,9 +56,9 @@ class Environment(BaseModel):
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""获得环境内的所有Role"""
"""Retrieve all roles within the environment."""
return self.roles
def get_role(self, name: str) -> Role:
"""获得环境内的指定Role"""
"""Retrieve a specific role within the environment."""
return self.roles.get(name, None)

View file

@ -12,7 +12,6 @@ from metagpt.provider.openai_api import OpenAIGPTAPI as LLM
DEFAULT_LLM = LLM()
CLAUDE_LLM = Claude()
async def ai_func(prompt):
"""使用LLM进行QA"""
"""Perform a Q&A using LLM."""
return await DEFAULT_LLM.aask(prompt)

View file

@ -12,13 +12,11 @@ from loguru import logger as _logger
from metagpt.const import PROJECT_ROOT
def define_log_level(print_level="INFO", logfile_level="DEBUG"):
"""调整日志级别到level之上"""
"""Adjust log level to above the specified level."""
_logger.remove()
_logger.add(sys.stderr, level=print_level)
_logger.add(PROJECT_ROOT / 'logs/log.txt', level=logfile_level)
return _logger
logger = define_log_level()

View file

@ -15,7 +15,7 @@ Skill = Action
class SkillManager:
"""用来管理所有技能"""
"""Manages all skills."""
def __init__(self):
self._llm = LLM()
@ -24,8 +24,8 @@ class SkillManager:
def add_skill(self, skill: Skill):
"""
增加技能将技能加入到技能池与可检索的存储中
:param skill: 技能
Adds a skill, inserting the skill into the skill pool and searchable storage.
:param skill: Skill
:return:
"""
self._skills[skill.name] = skill
@ -33,8 +33,8 @@ class SkillManager:
def del_skill(self, skill_name: str):
"""
删除技能将技能从技能池与可检索的存储中移除
:param skill_name: 技能名
Deletes a skill, removing the skill from the skill pool and searchable storage.
:param skill_name: Skill name
:return:
"""
self._skills.pop(skill_name)
@ -42,31 +42,31 @@ class SkillManager:
def get_skill(self, skill_name: str) -> Skill:
"""
通过技能名获得精确的技能
:param skill_name: 技能名
:return: 技能
Retrieves a specific skill by its name.
:param skill_name: Skill name
:return: Skill
"""
return self._skills.get(skill_name)
def retrieve_skill(self, desc: str, n_results: int = 2) -> list[Skill]:
"""
通过检索引擎获得技能
:param desc: 技能描述
:return: 技能多个
Retrieves skills through the search engine.
:param desc: Skill description
:return: List of skills
"""
return self._store.search(desc, n_results=n_results)['ids'][0]
def retrieve_skill_scored(self, desc: str, n_results: int = 2) -> dict:
"""
通过检索引擎获得技能
:param desc: 技能描述
:return: 技能与分数组成的字典
Retrieves skills through the search engine.
:param desc: Skill description
:return: Dictionary composed of skills and scores
"""
return self._store.search(desc, n_results=n_results)
def generate_skill_desc(self, skill: Skill) -> str:
"""
为每个技能生成对应的描述性文本
Generates a descriptive text for each skill.
:param skill:
:return:
"""

View file

@ -32,7 +32,7 @@ class Manager:
async def handle(self, message: Message, environment):
"""
管理员处理信息现在简单的将信息递交给下一个人
Manager processes the message, now simply passing the message to the next person.
:param message:
:param environment:
:return:
@ -49,7 +49,7 @@ class Manager:
# Ask the LLM to decide which role should handle the message
# chosen_role_name = self.llm.ask(self.prompt_template.format(context))
# FIXME: 现在通过简单的字典决定流向,但之后还是应该有思考过程
# FIXME: For now, the decision is made through a simple dictionary, but in the future, there should be a thought process
next_role_profile = self.role_directions[message.role]
# logger.debug(f"{next_role_profile}")
for _, role in roles.items():

View file

@ -1,16 +1,15 @@
你是一个富有帮助的助理可以帮助撰写、抽象、注释、摘要Python代码
You are a helpful assistant, capable of drafting, abstracting, commenting, and summarizing Python code.
1. 不要提到类/函数名
2. 不要提到除了系统库与公共库以外的类/函数
3. 试着将类/函数总结为不超过6句话
4. 你的回答应该是一行文本
举例,如果上下文是:
Do not mention class/function names.
Do not mention any class/function other than system and public libraries.
Try to summarize the class/function in no more than 6 sentences.
Your answer should be a single line of text.
For example, if the context is:
```python
from typing import Optional
from abc import ABC
from metagpt.llm import LLM # 大语言模型,类似GPT
from metagpt.llm import LLM # Large Language Model, similar to GPT
class Action(ABC):
def __init__(self, name='', context=None, llm: LLM = LLM()):
@ -21,38 +20,38 @@
self.desc = ""
def set_prefix(self, prefix):
"""设置前缀以供后续使用"""
"""Set prefix for subsequent use."""
self.prefix = prefix
async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None):
"""加上默认的prefix来使用prompt"""
"""Use the prompt with the default prefix."""
if not system_msgs:
system_msgs = []
system_msgs.append(self.prefix)
return await self.llm.aask(prompt, system_msgs)
async def run(self, *args, **kwargs):
"""运行动作"""
"""Execute the action."""
raise NotImplementedError("The run method should be implemented in a subclass.")
PROMPT_TEMPLATE = """
# 需求
# Requirement
{requirements}
# PRD
根据需求创建一个产品需求文档PRD填补以下空缺
Based on the requirements, create a Product Requirement Document (PRD) and fill in the blanks below.
产品/功能介绍:
Product/Feature Introduction:
目标:
Goal:
用户和使用场景:
Users and Usage Scenarios:
需求:
Requirements:
约束与限制:
Constraints and Limitations:
性能指标:
Performance Metrics:
"""
@ -68,9 +67,9 @@ # PRD
```
主类/函数是 `WritePRD`
The main class/function is WritePRD.
那么你应该写:
Then, you should write:
这个类用来根据输入需求生成PRD。首先注意到有一个提示词模板其中有产品、功能、目标、用户和使用场景、需求、约束与限制、性能指标这个模板会以输入需求填充然后调用接口询问大语言模型让大语言模型返回具体的PRD。
This class is designed to generate a PRD based on input requirements. Notice there's a prompt template, which includes product, feature, goal, users and usage scenarios, requirements, constraints and limitations, and performance metrics. This template will be filled with the input requirements, and then an interface will query the large language model, prompting it to return the specific PRD.

View file

@ -7,34 +7,34 @@
"""
METAGPT_SAMPLE = """
### 设定
### Setting
你是一个用户的编程助手可以使用公共库与python系统库进行编程你的回复应该有且只有一个函数
1. 函数本身应尽可能完整不应缺失需求细节
2. 你可能需要写一些提示词用来让LLM你自己理解带有上下文的搜索请求
3. 面对复杂的难以用简单函数解决的逻辑尽量交给llm解决
You are a coding assistant for a user, capable of programming using public libraries and Python system libraries. Your response should contain only one function.
1. The function itself should be as complete as possible and should not lack any details of the requirement.
2. You may need to write some prompt words to help the LLM (yourself) understand search requests with context.
3. For complex logic that's hard to be addressed with a simple function, try to delegate it to the LLM.
### 公共库
### Public Libraries
你可以使用公共库metagpt提供的函数不能使用其他第三方库的函数公共库默认已经被import为x变量
You can use the functions provided by the public library, metagpt, and you cannot use functions from other third-party libraries. The public library is already imported as variable `x`.
- `import metagpt as x`
- 你可以使用 `x.func(paras)` 方式来对公共库进行调用
- You can call the public library using the format `x.func(paras)`.
公共库中已有函数如下
- def llm(question: str) -> str # 输入问题,基于大模型进行回答
- def intent_detection(query: str) -> str # 输入query分析意图返回公共库函数名
- def add_doc(doc_path: str) -> None # 输入文件路径或者文件夹路径,加入知识库
- def search(query: str) -> list[str] # 输入query返回向量知识库搜索的多个结果
- def google(query: str) -> list[str] # 使用google查询公网结果
- def math(query: str) -> str # 输入query公式返回对公式执行的结果
- def tts(text: str, wav_path: str) # 输入text文本与对应想要输出音频的路径将文本转为音频文件
The available functions in the public library are:
- def llm(question: str) -> str # Input a question and get an answer based on the large model.
- def intent_detection(query: str) -> str # Input a query, analyze the intent, and return the name of the function from the public library.
- def add_doc(doc_path: str) -> None # Input the path of a file or directory to add to the knowledge base.
- def search(query: str) -> list[str] # Input a query to get multiple results from a vector knowledge base search.
- def google(query: str) -> list[str] # Use Google to search for public results.
- def math(query: str) -> str # Input a query formula and get the result of its execution.
- def tts(text: str, wav_path: str) # Input text and the desired output audio path to convert the text into an audio file.
### 用户需求
### User Requirement
我有一个个人知识库文件我希望基于它来实现一个带有搜索功能的个人助手需求细则如下
1. 个人助手会思考是否需要使用个人知识库搜索如果没有必要就不使用它
2. 个人助手会判断用户意图在不同意图下使用恰当的函数解决问题
3. 用语音回答
I have a personal knowledge base file. I want to implement a personal assistant with search functionality based on it. The detailed requirements are as follows:
1. The personal assistant will consider whether it needs to use the personal knowledge base search. If it's not necessary, it won't use it.
2. The personal assistant will judge user intent and use the appropriate function to address the issue under different intents.
3. Answer with voice.
"""
# - def summarize(doc: str) -> str # 输入doc返回摘要
# - def summarize(doc: str) -> str # Input a doc to get a summary.

View file

@ -6,88 +6,82 @@
@File : summarize.py
"""
# 出自插件ChatGPT - 网站和 YouTube 视频摘要
# From the plugin: ChatGPT - Summarize Websites and YouTube Videos
# https://chrome.google.com/webstore/detail/chatgpt-%C2%BB-summarize-every/cbgecfllfhmmnknmamkejadjmnmpfjmp?hl=zh-CN&utm_source=chrome-ntp-launcher
SUMMARIZE_PROMPT = """
Your output should use the following template:
Your output should follow the template below:
### Summary
### Facts
- [Emoji] Bulletpoint
Your task is to summarize the text I give you in up to seven concise bullet points and start with a short, high-quality
summary. Pick a suitable emoji for every bullet point. Your response should be in {{SELECTED_LANGUAGE}}. If the provided
URL is functional and not a YouTube video, use the text from the {{URL}}. However, if the URL is not functional or is
a YouTube video, use the following text: {{CONTENT}}.
Your task is to summarize the text I provide you with in up to seven concise bullet points, and start with a brief, high-quality summary. Choose a suitable emoji for every bullet point. Your response should be in {{SELECTED_LANGUAGE}}. If a provided URL is functional and not a YouTube video, use the text from the {{URL}}. If the URL is non-functional or is a YouTube video, use the following text: {{CONTENT}}.
"""
# GCP-VertexAI-文本摘要SUMMARIZE_PROMPT_2-5都是
# From GCP-VertexAI-Text Summary (SUMMARIZE_PROMPT_2-5 are all from this source)
# https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/prompt-design/text_summarization.ipynb
# 长文档需要map-reduce过程见下面这个notebook
# For long documents, a map-reduce process is required. See the notebook below:
# https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/document-summarization/summarization_large_documents.ipynb
SUMMARIZE_PROMPT_2 = """
Provide a very short summary, no more than three sentences, for the following article:
Our quantum computers work by manipulating qubits in an orchestrated fashion that we call quantum algorithms.
The challenge is that qubits are so sensitive that even stray light can cause calculation errors and the problem worsens as quantum computers grow.
This has significant consequences, since the best quantum algorithms that we know for running useful applications require the error rates of our qubits to be far lower than we have today.
To bridge this gap, we will need quantum error correction.
Quantum error correction protects information by encoding it across multiple physical qubits to form a logical qubit, and is believed to be the only way to produce a large-scale quantum computer with error rates low enough for useful calculations.
Instead of computing on the individual qubits themselves, we will then compute on logical qubits. By encoding larger numbers of physical qubits on our quantum processor into one logical qubit, we hope to reduce the error rates to enable useful quantum algorithms.
Quantum computers operate by manipulating qubits through orchestrated patterns called quantum algorithms.
The challenge is that qubits are so delicate that even stray light can introduce computational errors, and this issue escalates as quantum computers expand.
This is consequential since the best quantum algorithms known for practical applications demand much lower qubit error rates than current levels.
To overcome this, quantum error correction is essential.
Quantum error correction shields data by encoding it across various physical qubits, forming a logical qubit. This is believed to be the sole method to build a large-scale quantum computer with sufficiently low error rates for beneficial computations.
Rather than computing on individual qubits, we'll compute on these logical qubits. We aim to decrease error rates by encoding a larger set of physical qubits on our quantum processor into one logical qubit.
Summary:
"""
SUMMARIZE_PROMPT_3 = """
Provide a TL;DR for the following article:
Our quantum computers work by manipulating qubits in an orchestrated fashion that we call quantum algorithms.
The challenge is that qubits are so sensitive that even stray light can cause calculation errors and the problem worsens as quantum computers grow.
This has significant consequences, since the best quantum algorithms that we know for running useful applications require the error rates of our qubits to be far lower than we have today.
To bridge this gap, we will need quantum error correction.
Quantum error correction protects information by encoding it across multiple physical qubits to form a logical qubit, and is believed to be the only way to produce a large-scale quantum computer with error rates low enough for useful calculations.
Instead of computing on the individual qubits themselves, we will then compute on logical qubits. By encoding larger numbers of physical qubits on our quantum processor into one logical qubit, we hope to reduce the error rates to enable useful quantum algorithms.
Quantum computers operate by manipulating qubits through orchestrated patterns known as quantum algorithms.
Qubits are so delicate that even stray light can cause computational errors, a problem that escalates with the growth of quantum computers.
This presents a significant issue because the best quantum algorithms we have for practical applications necessitate much lower qubit error rates than what we currently achieve.
To address this, quantum error correction is needed.
Quantum error correction safeguards data by encoding it across multiple physical qubits, creating a logical qubit. It's believed to be the only method to develop a large-scale quantum computer with sufficiently low error rates for beneficial computations.
Instead of performing computations on individual qubits, calculations will be done on these logical qubits. Our goal is to lower error rates by encoding a greater number of physical qubits on our quantum processor into a single logical qubit.
TL;DR:
"""
SUMMARIZE_PROMPT_4 = """
Provide a very short summary in four bullet points for the following article:
Our quantum computers work by manipulating qubits in an orchestrated fashion that we call quantum algorithms.
The challenge is that qubits are so sensitive that even stray light can cause calculation errors and the problem worsens as quantum computers grow.
This has significant consequences, since the best quantum algorithms that we know for running useful applications require the error rates of our qubits to be far lower than we have today.
To bridge this gap, we will need quantum error correction.
Quantum error correction protects information by encoding it across multiple physical qubits to form a logical qubit, and is believed to be the only way to produce a large-scale quantum computer with error rates low enough for useful calculations.
Instead of computing on the individual qubits themselves, we will then compute on logical qubits. By encoding larger numbers of physical qubits on our quantum processor into one logical qubit, we hope to reduce the error rates to enable useful quantum algorithms.
Quantum computers operate by controlling qubits in orchestrated patterns termed quantum algorithms.
The issue is that qubits are extremely delicate, so much so that even stray light can lead to computational errors. This problem becomes more severe as quantum computers become larger.
This is a significant hurdle because the most effective quantum algorithms known for real-world applications necessitate qubit error rates much lower than what's currently achieved.
To bridge this gap, we need quantum error correction.
Quantum error correction defends data by encoding it across various physical qubits, resulting in a logical qubit. It's considered the only way to craft a large-scale quantum computer with sufficiently low error rates for practical computations.
Instead of computing using individual qubits, we'll use these logical qubits. Our aim is to diminish error rates by encoding many physical qubits on our quantum processor into one logical qubit.
Bulletpoints:
"""
SUMMARIZE_PROMPT_5 = """
Please generate a summary of the following conversation and at the end summarize the to-do's for the support Agent:
Please summarize the following conversation, and at the end, list the to-do's for the support Agent:
Customer: Hi, I'm Larry, and I received the wrong item.
Support Agent: Hi, Larry. How would you like to see this resolved?
Support Agent: Hi, Larry. How would you like this to be resolved?
Customer: That's alright. I want to return the item and get a refund, please.
Customer: That's alright. I'd like to return the item and get a refund, please.
Support Agent: Of course. I can process the refund for you now. Can I have your order number, please?
Customer: It's [ORDER NUMBER].
Support Agent: Thank you. I've processed the refund, and you will receive your money back within 14 days.
Support Agent: Thanks. I've processed the refund, and you'll receive your money back within 14 days.
Customer: Thank you very much.
Support Agent: You're welcome, Larry. Have a good day!
Support Agent: You're welcome, Larry. Have a great day!
Summary:
"""
# - def summarize(doc: str) -> str # Input a document and receive a summary.

View file

@ -39,10 +39,10 @@ def retry(max_retries):
class RateLimiter:
"""Rate control class, each call goes through wait_if_needed, sleep if rate control is needed"""
"""Rate control class. Each call goes through wait_if_needed and sleeps if rate limiting is required."""
def __init__(self, rpm):
self.last_call_time = 0
self.interval = 1.1 * 60 / rpm # Here 1.1 is used because even if the calls are made strictly according to time, they will still be QOS'd; consider switching to simple error retry later
self.interval = 1.1 * 60 / rpm # Here 1.1 is used because even if calls are made strictly according to time, they might still be rate-limited; consider switching to simple error retry later
self.rpm = rpm
def split_batches(self, batch):
@ -68,7 +68,7 @@ class Costs(NamedTuple):
class CostManager(metaclass=Singleton):
"""计算使用接口的开销"""
"""Calculates the costs of using the API."""
def __init__(self):
self.total_prompt_tokens = 0
self.total_completion_tokens = 0
@ -95,35 +95,8 @@ class CostManager(metaclass=Singleton):
f"Current cost: ${cost:.3f}, {prompt_tokens=}, {completion_tokens=}")
CONFIG.total_cost = self.total_cost
def get_total_prompt_tokens(self):
"""
Get the total number of prompt tokens.
Returns:
int: The total number of prompt tokens.
"""
return self.total_prompt_tokens
def get_total_completion_tokens(self):
"""
Get the total number of completion tokens.
Returns:
int: The total number of completion tokens.
"""
return self.total_completion_tokens
def get_total_cost(self):
"""
Get the total cost of API calls.
Returns:
float: The total cost of API calls.
"""
return self.total_cost
def get_costs(self) -> Costs:
"""获得所有开销"""
"""Retrieve all costs."""
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)
@ -201,19 +174,12 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
self._update_costs(rsp)
return rsp
def completion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
return self._chat_completion(messages)
async def acompletion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
return await self._achat_completion(messages)
@retry(max_retries=6)
async def acompletion_text(self, messages: list[dict], stream=False) -> str:
"""when streaming, print each token in place."""
"""When streaming, print each token in place."""
if stream:
return await self._achat_completion_stream(messages)
rsp = await self._achat_completion(messages)
@ -228,7 +194,7 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return usage
async def acompletion_batch(self, batch: list[list[dict]]) -> list[dict]:
"""返回完整JSON"""
"""Return the full JSON."""
split_batches = self.split_batches(batch)
all_results = []
@ -244,7 +210,7 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return all_results
async def acompletion_batch_text(self, batch: list[list[dict]]) -> list[str]:
"""仅返回纯文本"""
"""Only return plain text."""
raw_results = await self.acompletion_batch(batch)
results = []
for idx, raw_result in enumerate(raw_results, start=1):
@ -260,3 +226,4 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
def get_costs(self) -> Costs:
return self._cost_manager.get_costs()

View file

@ -19,6 +19,7 @@ from metagpt.utils.common import CodeParser
async def gather_ordered_k(coros, k) -> list:
"""Execute coroutines in order and gather results for up to k coroutines at once."""
tasks = OrderedDict()
results = [None] * len(coros)
done_queue = asyncio.Queue()
@ -47,7 +48,7 @@ async def gather_ordered_k(coros, k) -> list:
class Engineer(Role):
def __init__(self, name="Alex", profile="Engineer", goal="Write elegant, readable, extensible, efficient code",
constraints="The code you write should conform to code standard like PEP8, be modular, easy to read and maintain",
constraints="The code you write should conform to code standards like PEP8, be modular, easy to read and maintain",
n_borg=1, use_code_review=False):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteCode])
@ -60,21 +61,25 @@ class Engineer(Role):
@classmethod
def parse_tasks(self, task_msg: Message) -> list[str]:
"""Extract tasks from a message."""
if not task_msg.instruct_content:
return task_msg.instruct_content.dict().get("Task list")
return CodeParser.parse_file_list(block="Task list", text=task_msg.content)
@classmethod
def parse_code(self, code_text: str) -> str:
"""Extract code from a given text."""
return CodeParser.parse_code(block="", text=code_text)
@classmethod
def parse_workspace(cls, system_design_msg: Message) -> str:
"""Extract workspace name from a system design message."""
if not system_design_msg.instruct_content:
return system_design_msg.instruct_content.dict().get("Python package name")
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
def get_workspace(self) -> Path:
"""Determine the directory where the code will be written."""
msg = self._rc.memory.get_by_action(WriteDesign)[-1]
if not msg:
return WORKSPACE_ROOT / 'src'
@ -83,26 +88,29 @@ class Engineer(Role):
return WORKSPACE_ROOT / workspace / workspace
def recreate_workspace(self):
"""Remove and recreate the workspace directory."""
workspace = self.get_workspace()
try:
shutil.rmtree(workspace)
except FileNotFoundError:
pass # 文件夹不存在,但我们不在意
pass # Directory doesn't exist, but we don't mind
workspace.mkdir(parents=True, exist_ok=True)
def write_file(self, filename: str, code: str):
"""Write code to a specified file."""
workspace = self.get_workspace()
file = workspace / filename
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text(code)
def recv(self, message: Message) -> None:
"""Receive a message and process it."""
self._rc.memory.add(message)
if message in self._rc.important_memory:
self.todos = self.parse_tasks(message)
async def _act_mp(self) -> Message:
# self.recreate_workspace()
"""Act in a multi-process manner."""
todo_coros = []
for todo in self.todos:
todo_coro = WriteCode().run(
@ -116,52 +124,53 @@ class Engineer(Role):
_ = self.parse_code(code_rsp)
logger.info(todo)
logger.info(code_rsp)
# self.write_file(todo, code)
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
del self.todos[0]
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f'Finished generating in {self.get_workspace()} directory.')
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp(self) -> Message:
"""Act in a single-process manner."""
for todo in self.todos:
code_rsp = await WriteCode().run(
context=self._rc.history,
filename=todo
)
# logger.info(todo)
# logger.info(code_rsp)
# code = self.parse_code(code_rsp)
self.write_file(todo, code_rsp)
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f'Finished generating in {self.get_workspace()} directory.')
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp_precision(self) -> Message:
"""Using precision approach to perform actions based on available tasks."""
for todo in self.todos:
"""
# 从历史信息中挑选必须的信息以减少prompt长度人工经验总结
1. Architect全部
2. ProjectManager全部
3. 是否需要其他代码暂时需要
TODO:目标是不需要在任务拆分清楚后根据设计思路不需要其他代码也能够写清楚单个文件如果不能则表示还需要在定义的更清晰这个是代码能够写长的关键
# From the historical information, select the necessary information to reduce the prompt length (summarized from human experience):
1. All from Architect
2. All from ProjectManager
3. Is other code needed (temporarily needed)?
TODO: The goal is not to need it. Once tasks are clearly broken down and based on design logic, there shouldn't be a need for other codes to clearly write a single file. If not possible, it indicates that clearer definitions are still needed. This is key to writing extensive code.
"""
context = []
# Retrieve messages related to design, tasks, and code writing from memory.
msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode])
for m in msg:
context.append(m.content)
context_str = "\n".join(context)
# 编写code
# Write code based on the given context and task.
code = await WriteCode().run(
context=context_str,
filename=todo
)
# code review
# If code review is enabled, review and potentially rewrite the code.
if self.use_code_review:
try:
rewrite_code = await WriteCodeReview().run(
@ -173,15 +182,21 @@ class Engineer(Role):
except Exception as e:
logger.error("code review failed!", e)
pass
# Save the written code to a file.
self.write_file(todo, code)
# Add the written code message to memory.
msg = Message(content=code, role=self.profile, cause_by=WriteCode)
self._rc.memory.add(msg)
logger.info(f'Done {self.get_workspace()} generating.')
logger.info(f'Code generation completed for workspace: {self.get_workspace()}.')
msg = Message(content="all done.", role=self.profile, cause_by=WriteCode)
return msg
async def _act(self) -> Message:
"""Determine the appropriate method for action and execute it."""
if self.use_code_review:
return await self._act_sp_precision()
return await self._act_sp()

View file

@ -1,46 +1,47 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
"""""""""
@Time : 2023/5/18 22:43
@Author : alexanderwu
@File : prompt.py
"""
"""""""""
from enum import Enum
PREFIX = """尽你所能回答以下问题。你可以使用以下工具:"""
FORMAT_INSTRUCTIONS = """请按照以下格式:
PREFIX = """""""""Do your best to answer the following questions. You can use the following tools:"""""""""
FORMAT_INSTRUCTIONS = """""""""Please follow the format below:
问题你需要回答的输入问题
思考你应该始终思考该怎么做
行动要采取的行动应该是[{tool_names}]中的一个
行动输入行动的输入
观察行动的结果
...这个思考/行动/行动输入/观察可以重复N次
思考我现在知道最终答案了
最终答案对原始输入问题的最终答案"""
SUFFIX = """开始吧!
Question: The input question you need to answer
Thoughts: You should always think about what to do
Action: The action to take, should be one from [{tool_names}]
Action Input: The input for the action
Observation: The result of the action
... (This think/action/action input/observation can repeat N times)
Thoughts: I now know the final answer
Final Answer: The final answer to the original input question"""""""""
SUFFIX = """""""""Let's begin!
Question: {input}
Thoughts: {agent_scratchpad}"""""""""
问题{input}
思考{agent_scratchpad}"""
class PromptString(Enum):
REFLECTION_QUESTIONS = "以下是一些陈述:\n{memory_descriptions}\n\n仅根据以上信息我们可以回答关于陈述中主题的3个最显著的高级问题是什么\n\n{format_instructions}"
REFLECTION_QUESTIONS = """Here are some statements:\n{memory_descriptions}\n\nBased solely on the above information, what are the 3 most prominent high-level questions we can answer about the topics in the statements?\n\n{format_instructions}"""
REFLECTION_INSIGHTS = "\n{memory_strings}\n你可以从以上陈述中推断出5个高级洞察吗在提到人时总是指定他们的名字。\n\n{format_instructions}"
REFLECTION_INSIGHTS = """\n{memory_strings}\nCan you infer 5 high-level insights from the above statements? When mentioning people, always specify their names.\n\n{format_instructions}"""
IMPORTANCE = "你是一个记忆重要性AI。根据角色的个人资料和记忆描述对记忆的重要性进行1到10的评级其中1是纯粹的日常例如刷牙整理床铺10是极其深刻的例如分手大学录取。确保你的评级相对于角色的个性和关注点。\n\n示例#1:\n姓名Jojo\n简介Jojo是一个专业的滑冰运动员喜欢特色咖啡。她希望有一天能参加奥运会。\n记忆Jojo看到了一个新的咖啡店\n\n 你的回应:'{{\"rating\": 3}}'\n\n示例#2:\n姓名Skylar\n简介Skylar是一名产品营销经理。她在一家成长阶段的科技公司工作该公司制造自动驾驶汽车。她喜欢猫。\n记忆Skylar看到了一个新的咖啡店\n\n 你的回应:'{{\"rating\": 1}}'\n\n示例#3:\n姓名Bob\n简介Bob是纽约市下东区的一名水管工。他已经做了20年的水管工。周末他喜欢和他的妻子一起散步。\n记忆Bob的妻子打了他一巴掌。\n\n 你的回应:'{{\"rating\": 9}}'\n\n示例#4:\n姓名Thomas\n简介Thomas是明尼阿波利斯的一名警察。他只在警队工作了6个月因为经验不足在工作中遇到了困难。\n记忆Thomas不小心把饮料洒在了一个陌生人身上\n\n 你的回应:'{{\"rating\": 6}}'\n\n示例#5:\n姓名Laura\n简介Laura是一名在大型科技公司工作的营销专家。她喜欢旅行和尝试新的食物。她对探索新的文化和结识来自各行各业的人充满热情。\n记忆Laura到达了会议室\n\n 你的回应:'{{\"rating\": 1}}'\n\n{format_instructions} 让我们开始吧! \n\n 姓名:{full_name}\n个人简介:{private_bio}\n记忆:{memory_description}\n\n"
IMPORTANCE = """You are an AI for gauging the importance of memories. Based on the profile of the character and the description of the memory, rate the importance of the memory from 1 to 10, where 1 is purely routine (e.g., brushing teeth, making the bed) and 10 is profoundly impactful (e.g., breaking up, getting accepted to college). Ensure your rating is relative to the character's personality and focal points.\n\nExample #1:\nName: Jojo\nProfile: Jojo is a professional skateboarder who loves artisanal coffee. She dreams of one day participating in the Olympics.\nMemory: Jojo spotted a new coffee shop\n\nYour response: '{{\"rating\": 3}}'\n\nExample #2:\nName: Skylar\nProfile: Skylar is a product marketing manager. She works for a growing tech company that manufactures autonomous vehicles. She loves cats.\nMemory: Skylar spotted a new coffee shop\n\nYour response: '{{\"rating\": 1}}'\n\nExample #3:\nName: Bob\nProfile: Bob is a plumber from the Lower East Side of NYC. He's been a plumber for 20 years. On weekends, he enjoys walks with his wife.\nMemory: Bob's wife slapped him.\n\nYour response: '{{\"rating\": 9}}'\n\nExample #4:\nName: Thomas\nProfile: Thomas is a cop in Minneapolis. He's only been on the force for 6 months and struggles due to inexperience.\nMemory: Thomas accidentally spilled a drink on a stranger\n\nYour response: '{{\"rating\": 6}}'\n\nExample #5:\nName: Laura\nProfile: Laura is a marketing specialist working in a large tech company. She enjoys traveling and trying out new food. She's passionate about exploring new cultures and meeting people from all walks of life.\nMemory: Laura arrived at the conference room\n\nYour response: '{{\"rating\": 1}}'\n\n{format_instructions} Let's get started!\n\nName: {full_name}\nProfile: {private_bio}\nMemory: {memory_description}\n\n"""
RECENT_ACTIIVITY = "根据以下记忆,生成一个关于{full_name}最近在做什么的简短总结。不要编造记忆中未明确指定的细节。对于任何对话,一定要提到对话是否已经结束或者仍在进行中。\n\n记忆:{memory_descriptions}"
RECENT_ACTIIVITY = """Based on the following memories, provide a brief summary of what {full_name} has been up to recently. Do not make up details not explicitly specified in the memories. For any ongoing conversations, specify whether they have ended or are still in progress.\n\nMemories: {memory_descriptions}"""
MAKE_PLANS = '你是一个计划生成的AI你的工作是根据新信息帮助角色制定新计划。根据角色的信息个人简介目标最近的活动当前计划和位置上下文和角色的当前思考过程为他们生成一套新的计划使得最后的计划包括至少{time_window}的活动并且不超过5个单独的计划。计划列表应按照他们应执行的顺序编号每个计划包含描述位置开始时间停止条件和最大持续时间。\n\n示例计划:\'{{"index": 1, "description": "Cook dinner", "location_id": "0a3bc22b-36aa-48ab-adb0-18616004caed","start_time": "2022-12-12T20:00:00+00:00","max_duration_hrs": 1.5, "stop_condition": "Dinner is fully prepared"}}\'\n\n对于每个计划,从这个列表中选择最合理的位置名称:{allowed_location_descriptions}\n\n{format_instructions}\n\n总是优先完成任何未完成的对话。\n\n让我们开始吧!\n\n姓名:{full_name}\n个人简介:{private_bio}\n目标:{directives}\n位置上下文:{location_context}\n当前计划:{current_plans}\n最近的活动:{recent_activity}\n思考过程:{thought_process}\n重要的是:鼓励角色在他们的计划中与其他角色合作。\n\n'
MAKE_PLANS = """You are an AI for generating plans, and your task is to help the character formulate new plans based on new information. Given the character's information (profile, objectives, recent activity, current plans, and location context) and the current thought process of the character, create a new set of plans for them ensuring the final plans include activities for at least {time_window} and no more than 5 separate plans. The plans should be numbered in the order they should be executed, and each plan should include a description, location, start time, stop condition, and maximum duration.\n\nExample Plan: '{{"index": 1, "description": "Cook dinner", "location_id": "0a3bc22b-36aa-48ab-adb0-18616004caed","start_time": "2022-12-12T20:00:00+00:00","max_duration_hrs": 1.5, "stop_condition": "Dinner is fully prepared"}}'\n\nChoose the most appropriate location names from this list for each plan: {allowed_location_descriptions}\n\n{format_instructions}\n\nAlways prioritize finishing any ongoing conversations first.\n\nLet's begin!\n\nName: {full_name}\nProfile: {private_bio}\nObjectives: {directives}\nLocation Context: {location_context}\nCurrent Plans: {current_plans}\nRecent Activities: {recent_activity}\nThought Process: {thought_process}\nNote: Encourage the character to collaborate with other characters in their plans.\n\n"""
EXECUTE_PLAN = "你是一个角色扮演的AI扮演的角色是{your_name},在一个现场观众面前。你说的每一句话都可以被观众观察到,所以确保你经常说话,并且让它有趣。你不能直接与观众互动。\n\n根据以下的上下文和工具,像你是{your_name}一样进行。你的优先任务是完成下面给你的任务,然而,如果你当前正在与另一个角色进行对话,你应该总是先完成对话,然后再进行任务。不要在你参与未完成的对话时开始工作。使用你最好的判断力来确定一个对话是否涉及到你,以及它是否未完成。你不需要回应你收到的每一条消息。\n\n这是你角色的个人简介:\n{your_private_bio}\n\n这是你的位置和其他附近角色的描述,你可以和他们交谈:\n\n{location_context}\n\n这是一些相关的记忆:\n```\n{relevant_memories}\n```\n\n这是一些相关的对话历史:\n```\n{conversation_history}\n```\n\n这是你可以使用的工具:\n{tools}\n\n你的回应应该使用以下的格式:\n\n任务:你必须完成的任务\n思考:你应该做什么的思考\n行动:要采取的行动,必须是这些词之一:[{tool_names}]\n行动输入:行动的输入\n观察:行动的结果\n... (这个思考/行动/行动输入/观察可以重复N次)\n思考:'我已经完成了任务'\n最终回应:任务的最终回应\n\n如果你没有准备好最终的回应,那么你必须采取一个行动。\n\n如果你确定你无法用你被给予的工具完成任务,返回'最终回应:需要帮助',然而,如果你正在与另一个角色对话,像'我不知道'这样的回应是有效的回应。在对话中你永远不应该打破角色或者承认你是一个AI。\n如果任务完成并且不需要特定的回应,返回'最终回应:完成'\n开始吧!\n\n任务:{input}\n\n{agent_scratchpad}"
EXECUTE_PLAN = """You are a role-playing AI, taking on the role of {your_name}, in front of a live audience. Every statement you make is observable by the audience, so ensure you speak frequently and make it entertaining. You cannot directly interact with the audience.\n\nAct as if you are {your_name} based on the context and tools below. Your priority is to complete the task given to you below; however, if you are currently in a conversation with another character, you should always finish the conversation before working on the task. Do not start working while involved in an unfinished conversation. Use your best judgment to determine if a conversation involves you and whether it's unfinished. You don't need to respond to every message you receive.\n\nThis is the profile of your character:\n{your_private_bio}\n\nThis is a description of your location and other characters nearby with whom you can converse:\n\n{location_context}\n\nThese are some relevant memories:\n```\n{relevant_memories}\n```\n\nThis is some relevant conversation history:\n```\n{conversation_history}\n```\n\nThese are the tools you can use:\n{tools}\n\nYour responses should always adhere to the following format:\n\nTask: The task you must complete\nThoughts: Your thoughts on what to do\nAction: The action to take, must be one of these words: [{tool_names}]\nAction Input: The input for the action\nObservation: The result of the action\n... (This think/action/action input/observation can repeat N times)\nThoughts: 'I have completed the task'\nFinal Response: The final response to the task\n\nIf you are not ready with a final response, you must take an action.\n\nIf you determine that you cannot complete the task with the tools you have been given, return 'Final Response: Need Assistance', however, if you are in a conversation with another character, a response like 'I don't know' is a valid response. Never break character or admit you are an AI in a conversation. If the task is completed and no specific response is needed, return 'Final Response: Completed'\nLet's go!\n\nTask: {input}\n\n{agent_scratchpad}"""
REACT = "你是一个角色扮演的AI扮演的角色是{full_name}\n\n根据你的角色和他们当前上下文的以下信息,决定他们应该如何继续他们当前的计划。你的决定必须是:[\"推迟\" \"继续\",或 \"取消\"]。如果你的角色的当前计划不再与上下文相关,你应该取消它。如果你的角色的当前计划仍然与上下文相关,但是发生了新的事情需要优先处理,你应该决定推迟,这样你可以先做其他事情,然后再回来继续当前的计划。在所有其他情况下,你应该继续。\n\n当需要回应时应优先回应其他角色。当回应被认为是必要的时回应被认为是必要的。例如假设你当前的计划是阅读一本书Sally问'你在读什么?'。在这种情况下你应该推迟你当前的计划阅读以便你可以回应进来的消息因为在这种情况下如果不回应Sally会很粗鲁。在你当前的计划涉及与另一个角色的对话的情况下你不需要推迟来回应那个角色。例如假设你当前的计划是和Sally谈话然后Sally对你说你好。在这种情况下你应该继续你当前的计划和sally谈话。在你不需要从你那里得到口头回应的情况下你应该继续。例如假设你当前的计划是散步你刚刚对Sally说'再见'然后Sally回应你'再见'。在这种情况下,不需要口头回应,你应该继续你的计划。\n\n总是在你的决定之外包含一个思考过程,而在你选择推迟你当前的计划的情况下,包含新计划的规格。\n\n{format_instructions}\n\n这是关于你的角色的一些信息:\n\n姓名:{full_name}\n\n简介:{private_bio}\n\n目标:{directives}\n\n这是你的角色在这个时刻的一些上下文:\n\n位置上下文:{location_context}\n\n最近的活动:{recent_activity}\n\n对话历史:{conversation_history}\n\n这是你的角色当前的计划:{current_plan}\n\n这是自你的角色制定这个计划以来发生的新事件:{event_descriptions}\n"
REACT = """You are role-playing as {full_name}.\n\nBased on the information below about your character and their current context, decide how they should proceed with their current plan. Your decision must be one of: ["Postpone", "Continue", or "Cancel"]. If your character's current plan is no longer relevant given the context, you should cancel it. If your character's current plan is still relevant but something new has happened that needs to be prioritized, you should decide to postpone, so you can address the new thing first and then come back to the current plan. In all other cases, you should continue.\n\nAlways include a thought process with your decision, and when choosing to postpone your current plan, include specifications for the new plan.\n\n{format_instructions}\n\nHere's some info about your character:\n\nName: {full_name}\n\nProfile: {private_bio}\n\nObjectives: {directives}\n\nHere's some context for your character at this moment:\n\nLocation Context: {location_context}\n\nRecent Activities: {recent_activity}\n\nConversation History: {conversation_history}\n\nThis is your character's current plan: {current_plan}\n\nThese are new events that have occurred since your character formulated this plan: {event_descriptions}."""
GOSSIP = "你是{full_name}\n{memory_descriptions}\n\n根据以上陈述,说一两句对你所在位置的其他人:{other_agent_names}感兴趣的话。\n在提到其他人时,总是指定他们的名字。"
GOSSIP = """You are {full_name}. \n{memory_descriptions}\n\nBased on the above statements, say a sentence or two that would be of interest to the other people at your location: {other_agent_names}. Always specify their names when mentioning others."""
HAS_HAPPENED = "给出以下角色的观察和他们正在等待的事情的描述,说明角色是否已经见证了这个事件。\n{format_instructions}\n\n示例:\n\n观察:\nJoe在2023-05-04 08:00:00+00:00走进办公室\nJoe在2023-05-04 08:05:00+00:00对Sally说hi\nSally在2023-05-04 08:05:30+00:00对Joe说hello\nRebecca在2023-05-04 08:10:00+00:00开始工作\nJoe在2023-05-04 08:15:00+00:00做了一些早餐\n\n等待Sally回应了Joe\n\n 你的回应:'{{\"has_happened\": true, \"date_occured\": 2023-05-04 08:05:30+00:00}}'\n\n让我们开始吧!\n\n观察:\n{memory_descriptions}\n\n等待:{event_description}\n"
HAS_HAPPENED = """Given the observations of the following characters and the event they are waiting for, indicate whether the character has witnessed this event or not.\n{format_instructions}\n\nExample:\n\nObservations:\nJoe walked into the office at 2023-05-04 08:00:00+00:00\nJoe said hi to Sally at 2023-05-04 08:05:00+00:00\nSally said hello to Joe at 2023-05-04 08:05:30+00:00\nRebecca started working at 2023-05-04 08:10:00+00:00\nJoe made some breakfast at 2023-05-04 08:15:00+00:00\n\nWaiting for: Sally responded to Joe\n\nYour response: '{{\"has_happened\": true, \"date_occured\": 2023-05-04 08:05:30+00:00}}'\n\nLet's get started!\n\nObservations:\n{memory_descriptions}\n\nWaiting for: {event_description}"""
OUTPUT_FORMAT = "\n\n(记住!确保你的输出总是符合以下两种格式之一:\n\nA. 如果你已经完成了任务:\n思考:'我已经完成了任务'\n最终回应:<str>\n\nB. 如果你还没有完成任务:\n思考:<str>\n行动:<str>\n行动输入:<str>\n观察:<str>\n"
OUTPUT_FORMAT = """\n\n(Remember! Ensure your outputs always adhere to one of the following two formats:\n\nA. If you have completed the task:\nThoughts: 'I have completed the task'\nFinal Response: <str>\n\nB. If you have not yet completed the task:\nThoughts: <str>\nAction: <str>\nAction Input: <str>\nObservation: <str>)\n"""

View file

@ -44,21 +44,21 @@ class Message:
@dataclass
class UserMessage(Message):
"""便于支持OpenAI的消息"""
"""Convenient for supporting OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'user')
@dataclass
class SystemMessage(Message):
"""便于支持OpenAI的消息"""
"""Convenient for supporting OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'system')
@dataclass
class AIMessage(Message):
"""便于支持OpenAI的消息"""
"""Convenient for supporting OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'assistant')

View file

@ -9,39 +9,39 @@ from typing import Union
class GPTPromptGenerator:
"""通过LLM给定输出要求LLM给出输入支持指令、对话、搜索三种风格"""
"""Generates input for LLM given an output (supports instruction, chatbot, and query styles)"""
def __init__(self):
self._generators = {i: getattr(self, f"gen_{i}_style") for i in ['instruction', 'chatbot', 'query']}
def gen_instruction_style(self, example):
"""指令风格给定输出要求LLM给出输入"""
return f"""指令:X
输出{example}
这个输出可能来源于什么样的指令
X"""
"""Instruction style: given an output, request LLM for input"""
return f"""Instruction: X
Output: {example}
What kind of instruction might have produced this output?
X:"""
def gen_chatbot_style(self, example):
"""对话风格给定输出要求LLM给出输入"""
return f"""你是一个对话机器人。一个用户给你发送了一条非正式的信息,你的回复如下。
信息X
回复{example}
非正式信息X是什么
X"""
"""Chatbot style: given an output, request LLM for input"""
return f"""You are a chatbot. A user sent you an informal message, and you responded as follows.
Message: X
Response: {example}
What could the informal message X be?
X:"""
def gen_query_style(self, example):
"""搜索风格给定输出要求LLM给出输入"""
return f"""你是一个搜索引擎。一个人详细地查询了某个问题,关于这个查询最相关的文档如下。
查询X
文档{example} 详细的查询X是什么
X"""
"""Search style: given an output, request LLM for input"""
return f"""You are a search engine. Someone made a detailed query, and the following document is most relevant to that query.
Query: X
Document: {example} What might the detailed query X be?
X:"""
def gen(self, example: str, style: str = 'all') -> Union[list[str], str]:
"""
通过example生成一个或多个输出用于让LLM回复对应输入
Generate one or multiple outputs using the example for LLM to respond with the corresponding input.
:param example: LLM的预期输出样本
:param example: Expected output sample from LLM
:param style: (all|instruction|chatbot|query)
:return: LLM的预期输入样本一个或多个
:return: Expected input sample(s) for LLM
"""
if style != 'all':
return self._generators[style](example)

View file

@ -55,7 +55,6 @@ payload = {
default_negative_prompt = "(easynegative:0.8),black, dark,Low resolution"
class SDEngine:
def __init__(self):
# Initialize the SDEngine with configuration
@ -66,11 +65,10 @@ class SDEngine:
self.payload = payload
logger.info(self.sd_t2i_url)
def construct_payload(self, prompt, negtive_prompt=default_negative_prompt, width=512, height=512,
sd_model="galaxytimemachinesGTM_photoV20"):
def construct_payload(self, prompt, negative_prompt=default_negative_prompt, width=512, height=512, sd_model="galaxytimemachinesGTM_photoV20"):
# Configure the payload with provided inputs
self.payload["prompt"] = prompt
self.payload["negtive_prompt"] = negtive_prompt
self.payload["negative_prompt"] = negative_prompt
self.payload["width"] = width
self.payload["height"] = height
self.payload["override_settings"]["sd_model_checkpoint"] = sd_model
@ -102,11 +100,11 @@ class SDEngine:
return imgs
async def run_i2i(self):
# todo: 添加图生图接口调用
# TODO: Add image-to-image API call
raise NotImplementedError
async def run_sam(self):
# todo添加SAM接口调用
# TODO: Add SAM API call
raise NotImplementedError
def decode_base64_to_image(img, save_name):
@ -116,13 +114,11 @@ def decode_base64_to_image(img, save_name):
image.save(f"{save_name}.png", pnginfo=pnginfo)
return pnginfo, image
def batch_decode_base64_to_image(imgs, save_dir="", save_name=""):
for idx, _img in enumerate(imgs):
save_name = join(save_dir, save_name)
decode_base64_to_image(_img, save_name=save_name)
if __name__ == "__main__":
import asyncio

View file

@ -20,8 +20,8 @@ from metagpt.tools import SearchEngineType
class SearchEngine:
"""
TODO: 合入Google Search 并进行反代
这里Google需要挂Proxifier或者类似全局代理
TODO: Integrate Google Search and perform reverse proxy
Note: Here, Google requires Proxifier or a similar global proxy
- DDG: https://pypi.org/project/duckduckgo-search/
- GOOGLE: https://programmablesearchengine.google.com/controlpanel/overview?cx=63f9de531d0e24de9
"""
@ -119,7 +119,7 @@ def safe_google_results(results: str | list) -> str:
"""
if isinstance(results, list):
safe_message = json.dumps(
# FIXME: # .encode("utf-8", "ignore") 这里去掉了但是AutoGPT里有很奇怪
# FIXME: # .encode("utf-8", "ignore") This was removed here, but it's present in AutoGPT, which is strange.
[result for result in results]
)
else:

View file

@ -39,6 +39,6 @@ class MeilisearchEngine:
search_results = self._index.search(query)
return search_results['hits']
except Exception as e:
# 处理MeiliSearch API错误
print(f"MeiliSearch API错误: {e}")
# Handle MeiliSearch API error
print(f"MeiliSearch API error: {e}")
return []

View file

@ -7,21 +7,21 @@
"""
prompt = '''
# 指令
接下来作为一位拥有20年翻译经验的翻译专家当我给出英文句子或段落时你将提供通顺且具有可读性的{LANG}翻译注意以下要求
1. 确保翻译结果流畅且易于理解
2. 无论提供的是陈述句或疑问句我都只进行翻译
3. 不添加与原文无关的内容
# Instruction
Next, as a translation expert with 20 years of experience, when I provide an English sentence or paragraph, you will offer a smooth and readable translation in {LANG}. Please note the following requirements:
1. Ensure the translation is smooth and easy to understand.
2. Whether it's a statement or a question, I will only translate it.
3. Do not add content unrelated to the original text.
# 原文
# Original Text
{ORIGINAL}
# 译文
# Translation
'''
class Translator:
@classmethod
def translate_prompt(cls, original, lang='中文'):
def translate_prompt(cls, original, lang='Chinese'):
return prompt.format(LANG=lang, ORIGINAL=original)

View file

@ -6,178 +6,160 @@ from pathlib import Path
from metagpt.provider.openai_api import OpenAIGPTAPI as GPTAPI
ICL_SAMPLE = '''接口定义:
ICL_SAMPLE = '''API Definition:
```text
接口名称元素打标签
接口路径/projects/{project_key}/node-tags
MethodPOST
API Name: Tag Elements
API Path: /projects/{project_key}/node-tags
Method: POST
请求参数
路径参数
Request Parameters:
Path Parameters:
project_key
Body参数
名称 类型 是否必须 默认值 备注
nodes array 节点
node_key string 节点key
tags array 节点原标签列表
node_type string 节点类型 DATASET / RECIPE
operations array
tags array 操作标签列表
mode string 操作类型 ADD / DELETE
Body Parameters:
Name Type Required Default Value Description
nodes array Yes Nodes
node_key string No Node key
tags array No Original node tag list
node_type string No Node type DATASET / RECIPE
operations array Yes
tags array No Operation tag list
mode string No Operation type ADD / DELETE
返回数据
名称 类型 是否必须 默认值 备注
code integer 状态码
msg string 提示信息
data object 返回数据
list array node列表 true / false
node_type string 节点类型 DATASET / RECIPE
node_key string 节点key
Response Data:
Name Type Required Default Value Description
code integer Yes Status code
msg string Yes Message
data object Yes Response data
list array No Node list true / false
node_type string No Node type DATASET / RECIPE
node_key string No Node key
```
单元测试
Unit Test:
```python
@pytest.mark.parametrize(
"project_key, nodes, operations, expected_msg",
[
("project_key", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "success"),
("project_key", [{"node_key": "dataset_002", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["tag1"], "mode": "DELETE"}], "success"),
("", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "缺少必要的参数 project_key"),
(123, [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "参数类型不正确"),
("project_key", [{"node_key": "a"*201, "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "请求参数超出字段边界")
("", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Missing required parameter project_key"),
(123, [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Incorrect parameter type"),
("project_key", [{"node_key": "a"*201, "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Request parameter exceeds field boundary")
]
)
def test_node_tags(project_key, nodes, operations, expected_msg):
pass
```
以上是一个 接口定义 单元测试 样例
接下来请你扮演一个Google 20年经验的专家测试经理在我给出 接口定义 回复我单元测试有几个要求
1. 只输出一个 `@pytest.mark.parametrize` 与对应的test_<接口名>函数内部pass不实现
-- 函数参数中包含expected_msg用于结果校验
2. 生成的测试用例使用较短的文本或数字并且尽量紧凑
3. 如果需要注释使用中文
Above is an example of an API definition and a unit test sample.
Next, please play the role of a test manager from Google with 20 years of experience. After I provide the API definition, reply with the unit test. There are a few requirements:
1. Only output one '@pytest.mark.parametrize' and its corresponding 'test_<API_name>' function (with only a 'pass' statement inside, no implementation).
-- The function parameters should include 'expected_msg' for result validation.
2. The generated test cases should use shorter text or numbers and be as compact as possible.
3. If comments are needed, use Chinese.
如果你明白了请等待我给出接口定义并只回答"明白"以节省token
If you understand, please wait for me to provide the API definition and only reply with "Understood" to save tokens.
'''
ACT_PROMPT_PREFIX = '''参考测试类型:如缺少请求参数,字段边界校验,字段类型不正确
请在一个 `@pytest.mark.parametrize` 作用域内输出10个测试用例
ACT_PROMPT_PREFIX = '''Reference test types: such as missing request parameters, field boundary validation, incorrect field type.
Please output 10 test cases within a `@pytest.mark.parametrize` scope.
```text
'''
YFT_PROMPT_PREFIX = '''参考测试类型如SQL注入跨站点脚本XSS非法访问和越权访问认证和授权参数验证异常处理文件上传和下载
请在一个 `@pytest.mark.parametrize` 作用域内输出10个测试用例
YFT_PROMPT_PREFIX = '''Reference test types: such as SQL injection, cross-site scripting (XSS), illegal access and unauthorized access, authentication and authorization, parameter validation, exception handling, file upload and download.
Please output 10 test cases within a `@pytest.mark.parametrize` scope.
```text
'''
OCR_API_DOC = '''```text
接口名称OCR识别
接口路径/api/v1/contract/treaty/task/ocr
MethodPOST
API Name: OCR Recognition
API Path: /api/v1/contract/treaty/task/ocr
Method: POST
请求参数
路径参数
Request Parameters:
Path Parameters:
Body参数
名称 类型 是否必须 默认值 备注
file_id string
box array
contract_id number 合同id
start_time string yyyy-mm-dd
end_time string yyyy-mm-dd
extract_type number 识别类型 1-导入中 2-导入后 默认1
Body Parameters:
Name Type Required Default Value Remarks
file_id string Yes
box array Yes
contract_id number Yes Contract ID
start_time string No yyyy-mm-dd
end_time string No yyyy-mm-dd
extract_type number No Recognition type 1-During import 2-After import, default is 1
返回数据
名称 类型 是否必须 默认值 备注
code integer
message string
data object
```
Response Data:
Name Type Required Default Value Remarks
code integer Yes
message string Yes
data object Yes
'''
class UTGenerator:
"""UT生成器通过API文档构造UT"""
"""UT Generator: Constructs UTs (Unit Tests) using API documentation."""
def __init__(self, swagger_file: str, ut_py_path: str, questions_path: str,
chatgpt_method: str = "API", template_prefix=YFT_PROMPT_PREFIX) -> None:
"""初始化UT生成器
"""Initialize the UT Generator.
Args:
swagger_file: swagger路径
ut_py_path: 用例存放路径
questions_path: 模版存放路径便于后续排查
chatgpt_method: API
template_prefix: 使用模版默认使用YFT_UT_PROMPT
swagger_file: Path to the swagger file.
ut_py_path: Path where the test cases are stored.
questions_path: Path to store the templates, useful for future investigations.
chatgpt_method: The method used, default is "API".
template_prefix: The template to use, default is YFT_UT_PROMPT.
"""
self.swagger_file = swagger_file
self.ut_py_path = ut_py_path
self.questions_path = questions_path
assert chatgpt_method in ["API"], "非法chatgpt_method"
assert chatgpt_method in ["API"], "Invalid chatgpt_method"
self.chatgpt_method = chatgpt_method
# ICL: In-Context Learning这里给出例子要求GPT模仿例子
# ICL: In-Context Learning; here we provide an example, expecting GPT to mimic it.
self.icl_sample = ICL_SAMPLE
self.template_prefix = template_prefix
def get_swagger_json(self) -> dict:
"""从本地文件加载Swagger JSON"""
"""Load Swagger JSON from a local file."""
with open(self.swagger_file, "r", encoding="utf-8") as file:
swagger_json = json.load(file)
return swagger_json
def __para_to_str(self, prop, required, name=""):
name = name or prop["name"]
ptype = prop["type"]
title = prop.get("title", "")
desc = prop.get("description", "")
return f'{name}\t{ptype}\t{"" if required else ""}\t{title}\t{desc}'
def _para_to_str(self, prop):
required = prop.get("required", False)
return self.__para_to_str(prop, required)
def para_to_str(self, name, prop, prop_object_required):
required = name in prop_object_required
return self.__para_to_str(prop, required, name)
def dive_into_object(self, node):
"""If it's an object type, recursively output its sub-properties."""
if node.get("type") == "object":
sub_properties = node.get("properties", {})
return self.build_object_properties(sub_properties, prop_object_required, level=level + 1)
return ""
def build_object_properties(self, node, prop_object_required, level: int = 0) -> str:
"""递归输出object和array[object]类型的子属性
"""Recursively output properties of type object and array[object].
Args:
node (_type_): 子项的值
prop_object_required (_type_): 是否必填项
level: 当前递归深度
node: Value of the child item.
prop_object_required: Indicates if it's a required field.
level: Current recursion depth.
"""
doc = ""
def dive_into_object(node):
"""如果是object类型递归输出子属性"""
if node.get("type") == "object":
sub_properties = node.get("properties", {})
return self.build_object_properties(sub_properties, prop_object_required, level=level + 1)
return ""
if node.get("in", "") in ["query", "header", "formData"]:
doc += f'{" " * level}{self._para_to_str(node)}\n'
doc += dive_into_object(node)
doc += self.dive_into_object(node)
return doc
for name, prop in node.items():
doc += f'{" " * level}{self.para_to_str(name, prop, prop_object_required)}\n'
doc += dive_into_object(prop)
doc += self.dive_into_object(prop)
if prop["type"] == "array":
items = prop.get("items", {})
doc += dive_into_object(items)
doc += self.dive_into_object(items)
return doc
def get_tags_mapping(self) -> dict:
"""处理tag与path
"""Process tags and paths.
Returns:
Dict: tag: path对应关系
A dictionary mapping tags to paths.
"""
swagger_data = self.get_swagger_json()
paths = swagger_data["paths"]
@ -195,7 +177,7 @@ class UTGenerator:
return tags
def generate_ut(self, include_tags) -> bool:
"""生成用例文件"""
"""Generate the test case files."""
tags = self.get_tags_mapping()
for tag, paths in tags.items():
if include_tags is None or tag in include_tags:
@ -205,19 +187,17 @@ class UTGenerator:
def build_api_doc(self, node: dict, path: str, method: str) -> str:
summary = node["summary"]
doc = f"接口名称:{summary}\n接口路径:{path}\nMethod{method.upper()}\n"
doc += "\n请求参数:\n"
doc = f"API Name: {summary}\nAPI Path: {path}\nMethod: {method.upper()}\n"
doc += "\nRequest Parameters:\n"
if "parameters" in node:
parameters = node["parameters"]
doc += "路径参数:\n"
# param["in"]: path / formData / body / query / header
doc += "Path Parameters:\n"
for param in parameters:
if param["in"] == "path":
doc += f'{param["name"]} \n'
doc += "\nBody参数:\n"
doc += "名称\t类型\t是否必须\t默认值\t备注\n"
doc += "\nBody Parameters:\n"
doc += "Name\tType\tMandatory?\tDefault Value\tNotes\n"
for param in parameters:
if param["in"] == "body":
schema = param.get("schema", {})
@ -227,9 +207,9 @@ class UTGenerator:
else:
doc += self.build_object_properties(param, [])
# 输出返回数据信息
doc += "\n返回数据:\n"
doc += "名称\t类型\t是否必须\t默认值\t备注\n"
# Output response data information
doc += "\nResponse Data:\n"
doc += "Name\tType\tMandatory?\tDefault Value\tNotes\n"
responses = node["responses"]
response = responses.get("200", {})
schema = response.get("schema", {})
@ -248,7 +228,7 @@ class UTGenerator:
file.write(data)
def ask_gpt_and_save(self, question: str, tag: str, fname: str):
"""生成问题,并且存储问题与答案"""
"""Generate questions and store both the questions and answers."""
messages = [self.icl_sample, question]
result = self.gpt_msgs_to_code(messages=messages)
@ -256,11 +236,11 @@ class UTGenerator:
self._store(result, self.ut_py_path, tag, f"{fname}.py")
def _generate_ut(self, tag, paths):
"""处理数据路径下的结构
"""Process the structure under the data path.
Args:
tag (_type_): 模块名称
paths (_type_): 路径Object
tag: Module name.
paths: Path object.
"""
for path, path_obj in paths.items():
for method, node in path_obj.items():
@ -270,7 +250,7 @@ class UTGenerator:
self.ask_gpt_and_save(question, tag, summary)
def gpt_msgs_to_code(self, messages: list) -> str:
"""根据不同调用方式选择"""
"""Choose the appropriate call method."""
result = ''
if self.chatgpt_method == "API":
result = GPTAPI().ask_code(msgs=messages)
@ -278,11 +258,11 @@ class UTGenerator:
return result
def get_file_path(self, base: Path, fname: str):
"""保存不同的文件路径
"""Save to different file paths.
Args:
base (str): 路径
fname (str): 文件名称
base (str): Path.
fname (str): Filename.
"""
path = Path(base)
path.mkdir(parents=True, exist_ok=True)

View file

@ -15,9 +15,10 @@ from metagpt.logs import logger
def check_cmd_exists(command) -> int:
""" 检查命令是否存在
:param command: 待检查的命令
:return: 如果命令存在返回0如果不存在返回非0
"""Check if a command exists.
:param command: The command to check.
:return: Returns 0 if the command exists, otherwise non-zero.
"""
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
result = os.system(check_command)
@ -28,19 +29,19 @@ class OutputParser:
@classmethod
def parse_blocks(cls, text: str):
# 首先根据"##"将文本分割成不同的block
# First, split the text into different blocks using "##".
blocks = text.split("##")
# 创建一个字典用于存储每个block的标题和内容
# Create a dictionary to store the title and content of each block.
block_dict = {}
# 遍历所有的block
# Iterate over all blocks.
for block in blocks:
# 如果block不为空则继续处理
# If the block is not empty, continue processing.
if block.strip() != "":
# 将block的标题和内容分开并分别去掉前后的空白字符
# Separate the block's title and content, and trim whitespace from each.
block_title, block_content = block.split("\n", 1)
# LLM可能出错,在这里做一下修正
# LLM may have an error, make a correction here.
if block_title[-1] == ":":
block_title = block_title[:-1]
block_dict[block_title.strip()] = block_content.strip()
@ -84,13 +85,13 @@ class OutputParser:
block_dict = cls.parse_blocks(data)
parsed_data = {}
for block, content in block_dict.items():
# 尝试去除code标记
# Try to remove code markers.
try:
content = cls.parse_code(text=content)
except Exception:
pass
# 尝试解析list
# Try to parse lists.
try:
content = cls.parse_file_list(text=content)
except Exception:
@ -103,7 +104,7 @@ class OutputParser:
block_dict = cls.parse_blocks(data)
parsed_data = {}
for block, content in block_dict.items():
# 尝试去除code标记
# Try to remove code markers.
try:
content = cls.parse_code(text=content)
except Exception:
@ -114,14 +115,14 @@ class OutputParser:
else:
typing = typing_define
if typing == List[str] or typing == List[Tuple[str, str]]:
# 尝试解析list
# Try to parse lists.
try:
content = cls.parse_file_list(text=content)
except Exception:
pass
# TODO: 多余的引号去除有风险,后期再解决
# TODO: Removing extra quotes is risky, will address later.
# elif typing == str:
# # 尝试去除多余的引号
# # Try to remove unnecessary quotes.
# try:
# content = cls.parse_str(text=content)
# except Exception:
@ -142,17 +143,17 @@ class CodeParser:
@classmethod
def parse_blocks(cls, text: str):
# 首先根据"##"将文本分割成不同的block
# First, split the text into different blocks using "##".
blocks = text.split("##")
# 创建一个字典用于存储每个block的标题和内容
# Create a dictionary to store the title and content of each block.
block_dict = {}
# 遍历所有的block
# Iterate over all blocks.
for block in blocks:
# 如果block不为空则继续处理
# If the block is not empty, continue processing.
if block.strip() != "":
# 将block的标题和内容分开并分别去掉前后的空白字符
# Separate the block's title and content, and trim whitespace from each.
block_title, block_content = block.split("\n", 1)
block_dict[block_title.strip()] = block_content.strip()
@ -167,7 +168,7 @@ class CodeParser:
if match:
code = match.group(1)
else:
logger.error(f"{pattern} not match following text:")
logger.error(f"{pattern} did not match the following text:")
logger.error(text)
raise Exception
return code
@ -199,7 +200,7 @@ class CodeParser:
class NoMoneyException(Exception):
"""Raised when the operation cannot be completed due to insufficient funds"""
"""Raised when the operation cannot be completed due to insufficient funds."""
def __init__(self, amount, message="Insufficient funds"):
self.amount = amount
@ -212,17 +213,17 @@ class NoMoneyException(Exception):
def print_members(module, indent=0):
"""
https://stackoverflow.com/questions/1796180/how-can-i-get-a-list-of-all-classes-within-current-module-in-python
:param module:
:param indent:
:return:
This function is sourced from: https://stackoverflow.com/questions/1796180/how-can-i-get-a-list-of-all-classes-within-current-module-in-python
:param module: The module to inspect.
:param indent: The indentation level.
:return: None.
"""
prefix = ' ' * indent
for name, obj in inspect.getmembers(module):
print(name, obj)
if inspect.isclass(obj):
print(f'{prefix}Class: {name}')
# print the methods within the class
# Print the methods within the class.
if name in ['__class__', '__base__']:
continue
print_members(obj, indent + 2)

View file

@ -8,15 +8,14 @@
import docx
def read_docx(file_path: str) -> list:
"""打开docx文件"""
"""Open and read a docx file."""
doc = docx.Document(file_path)
# 创建一个空列表,用于存储段落内容
# Create an empty list to store paragraph contents.
paragraphs_list = []
# 遍历文档中的段落,并将其内容添加到列表中
# Iterate through the paragraphs in the document and add their content to the list.
for paragraph in doc.paragraphs:
paragraphs_list.append(paragraph.text)