another run through for translation.

This commit is contained in:
brucemeek 2023-08-02 15:57:10 -05:00
parent 0eea6bc19f
commit 0d632c7e56
72 changed files with 704 additions and 575 deletions

2
.gitignore vendored
View file

@ -162,4 +162,4 @@ examples/nb/
workspace/*
*.mmd
tmp
output.wav
output.wav

View file

@ -23,4 +23,4 @@ RUN cd /app/metagpt &&\
WORKDIR /app/metagpt
# Running with an infinite loop using the tail command
CMD ["sh", "-c", "tail -f /dev/null"]
CMD ["sh", "-c", "tail -f /dev/null"]

0
Message Normal file
View file

0
None Normal file
View file

View file

@ -2,10 +2,12 @@
# The configuration of key.yaml has a higher priority and will not enter git
#### if OpenAI
#OPENAI_API_KEY: "YOUR_API_KEY"
#OPENAI_API_BASE: "YOUR_API_BASE"
## The official OPENAI_API_BASE is https://api.openai.com/v1
## If the official OPENAI_API_BASE is not available, we recommend using the [openai-forward](https://github.com/beidongjiedeguang/openai-forward).
## Or, you can configure OPENAI_PROXY to access official OPENAI_API_BASE.
OPENAI_API_BASE: "https://api.openai.com/v1"
#OPENAI_PROXY: "http://127.0.0.1:8118"
#OPENAI_API_KEY: "YOUR_API_KEY"
OPENAI_API_MODEL: "gpt-4"
MAX_TOKENS: 1500
RPM: 10
@ -55,3 +57,12 @@ SD_T2I_API: "/sdapi/v1/txt2img"
#### for Execution
#LONG_TERM_MEMORY: false
#### for Mermaid CLI
## If you installed mmdc (Mermaid CLI) only for metagpt then enable the following configuration.
#PUPPETEER_CONFIG: "./config/puppeteer-config.json"
#MMDC: "./node_modules/.bin/mmdc"
### for update_costs & calc_usage
UPDATE_COSTS: false
CALC_USAGE: false

0
int Normal file
View file

View file

@ -40,3 +40,4 @@ class ActionType(Enum):
WRITE_TASKS = WriteTasks
ASSIGN_TASKS = AssignTasks
SEARCH_AND_SUMMARIZE = SearchAndSummarize

View file

@ -65,3 +65,4 @@ class Action(ABC):
async def run(self, *args, **kwargs):
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")

View file

@ -40,3 +40,4 @@ class ActionOutput:
new_class.__validator_check_name = classmethod(check_name)
new_class.__root_validator_check_missing_fields = classmethod(check_missing_fields)
return new_class

View file

@ -28,10 +28,10 @@ Focus only on the names of shared dependencies, do not add any other explanation
class AnalyzeDepLibs(Action):
def __init__(self, name, context=None, llm=None):
super().__init__(name, context, llm)
self.desc = "根据上下文,分析程序运行依赖库"
self.desc = "Analyze the runtime dependencies of the program based on the context"
async def run(self, requirement, filepaths_string):
# prompt = f"以下是产品需求文档(PRD):\n\n{prd}\n\n{PROMPT}"
# prompt = f"Below is the product requirement document (PRD):\n\n{prd}\n\n{PROMPT}"
prompt = PROMPT.format(prompt=requirement, filepaths_string=filepaths_string)
design_filenames = await self._aask(prompt)
return design_filenames

View file

@ -16,7 +16,7 @@ class AzureTTS(Action):
super().__init__(name, context, llm)
self.config = Config()
# 参数参考:https://learn.microsoft.com/zh-cn/azure/cognitive-services/speech-service/language-support?tabs=tts#voice-styles-and-roles
# Parameters reference: https://learn.microsoft.com/zh-cn/azure/cognitive-services/speech-service/language-support?tabs=tts#voice-styles-and-roles
def synthesize_speech(self, lang, voice, role, text, output_file):
subscription_key = self.config.get('AZURE_TTS_SUBSCRIPTION_KEY')
region = self.config.get('AZURE_TTS_REGION')
@ -49,5 +49,5 @@ if __name__ == "__main__":
"zh-CN",
"zh-CN-YunxiNeural",
"Boy",
"你好,我是卡卡",
"Hello, I am Kaka",
"output.wav")

View file

@ -17,3 +17,4 @@ class DebugError(Action):
f"\n\n{error}\n\nPlease try to fix the error in this code."
fixed_code = await self._aask(prompt)
return fixed_code

View file

@ -100,7 +100,7 @@ class WriteDesign(Action):
try:
shutil.rmtree(workspace)
except FileNotFoundError:
pass # 文件夹不存在,但我们不在意
pass # Folder does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
def _save_prd(self, docs_path, resources_path, prd):
@ -141,3 +141,4 @@ class WriteDesign(Action):
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING)
self._save(context, system_design)
return system_design

View file

@ -19,3 +19,4 @@ class DesignReview(Action):
api_review = await self._aask(prompt)
return api_review

View file

@ -26,3 +26,4 @@ class DesignFilenames(Action):
logger.debug(prompt)
logger.debug(design_filenames)
return design_filenames

View file

@ -126,3 +126,4 @@ class AssignTasks(Action):
async def run(self, *args, **kwargs):
# Here you should implement the actual action
pass

View file

@ -23,3 +23,4 @@ class RunCode(Action):
except Exception:
# If there is an error in the code, return the error message
return traceback.format_exc()

View file

@ -137,3 +137,4 @@ class SearchAndSummarize(Action):
logger.debug(prompt)
logger.debug(result)
return result

View file

@ -79,3 +79,4 @@ class WriteCode(Action):
# code_rsp = await self._aask_v1(prompt, "code_rsp", OUTPUT_MAPPING)
# self._save(context, filename, code)
return code

View file

@ -79,3 +79,4 @@ class WriteCodeReview(Action):
# code_rsp = await self._aask_v1(prompt, "code_rsp", OUTPUT_MAPPING)
# self._save(context, filename, code)
return code

View file

@ -144,3 +144,4 @@ class WritePRD(Action):
logger.debug(prompt)
prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING)
return prd

View file

@ -25,3 +25,4 @@ class WritePRDReview(Action):
prompt = self.prd_review_prompt_template.format(prd=self.prd)
review = await self._aask(prompt)
return review

View file

@ -24,3 +24,4 @@ class WriteTest(Action):
prompt = self.test_prompt_template.format(code=self.code)
test_cases = await self._aask(prompt)
return test_cases

View file

@ -28,9 +28,9 @@ class NotConfiguredException(Exception):
class Config(metaclass=Singleton):
"""
Common usage:
Regular usage method:
config = Config("config.yaml")
secret_key = config.get_key("MY_SECRET_KEY")
secret_key = config.get("MY_SECRET_KEY")
print("Secret key:", secret_key)
"""
@ -77,9 +77,11 @@ class Config(metaclass=Singleton):
logger.warning("LONG_TERM_MEMORY is True")
self.max_budget = self._get("MAX_BUDGET", 10.0)
self.total_cost = 0.0
self.puppeteer_config = self._get("PUPPETEER_CONFIG","")
self.mmdc = self._get("MMDC","mmdc")
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
"""Load from config/key.yaml / config/config.yaml / env in decreasing priority"""
"""Load from config/key.yaml, config/config.yaml, and env in decreasing order of priority"""
configs.update(os.environ)
for _yaml_file in [yaml_file, self.key_yaml_file]:
@ -98,11 +100,11 @@ class Config(metaclass=Singleton):
return self._configs.get(*args, **kwargs)
def get(self, key, *args, **kwargs):
"""Find values from config/key.yaml / config/config.yaml / env, report an error if not found"""
"""Search for a value in config/key.yaml, config/config.yaml, and env; raise an error if not found"""
value = self._get(key, *args, **kwargs)
if value is None:
raise ValueError(f"Key '{key}' not found in environment variables or in the YAML file")
return value
CONFIG = Config()
CONFIG = Config()

View file

@ -12,7 +12,7 @@ from metagpt.config import Config
class BaseStore(ABC):
"""FIXME: consider add_index, set_index and think 颗粒度"""
"""FIXME: consider add_index, set_index and think about granularity."""
@abstractmethod
def search(self, query, *args, **kwargs):
@ -53,3 +53,4 @@ class LocalStore(BaseStore, ABC):
@abstractmethod
def _write(self, docs, metadatas):
raise NotImplementedError

View file

@ -9,7 +9,7 @@ import chromadb
class ChromaStore:
"""如果从BaseStore继承或者引入metagpt的其他模块就会Python异常很奇怪"""
"""If inherited from BaseStore, or importing other modules from metagpt, a Python exception occurs, which is strange."""
def __init__(self, name):
client = chromadb.Client()
collection = client.create_collection(name)
@ -27,7 +27,7 @@ class ChromaStore:
return results
def persist(self):
"""chroma建议使用server模式不本地persist"""
"""Chroma recommends using server mode and not persisting locally."""
raise NotImplementedError
def write(self, documents, metadatas, ids):

View file

@ -79,3 +79,4 @@ class Document:
return self._get_docs_and_metadatas_by_langchain()
else:
raise NotImplementedError

View file

@ -28,7 +28,7 @@ class FaissStore(LocalStore):
def _load(self) -> Optional["FaissStore"]:
index_file, store_file = self._get_index_and_store_fname()
if not (index_file.exists() and store_file.exists()):
logger.info("At least one of the index_file/store_file is missing. Loading failed and returns None.")
logger.info("Missing at least one of index_file/store_file, load failed and return None")
return None
index = faiss.read_index(str(index_file))
with open(str(store_file), "rb") as f:
@ -59,7 +59,7 @@ class FaissStore(LocalStore):
return str(sep.join([f"{x.page_content}" for x in rsp]))
def write(self):
"""Initialize the index and library based on the provided Document (JSON / XLSX, etc.) file."""
"""Initialize the index and library based on the Document (JSON / XLSX, etc.) file provided by the user."""
if not self.raw_data.exists():
raise FileNotFoundError
doc = Document(self.raw_data, self.content_col, self.meta_col)
@ -67,18 +67,19 @@ class FaissStore(LocalStore):
self.store = self._write(docs, metadatas)
self.persist()
return self.store
def add(self, texts: list[str], *args, **kwargs) -> list[str]:
"""FIXME: The store isn't currently updated after adding."""
"""FIXME: Currently, the store is not updated after adding."""
return self.store.add_texts(texts)
def delete(self, *args, **kwargs):
"""Currently, langchain doesn't provide a delete interface."""
"""Currently, langchain does not provide a delete interface."""
raise NotImplementedError
if __name__ == '__main__':
faiss_store = FaissStore(DATA_PATH / 'qcs/qcs_4w.json')
logger.info(faiss_store.search('Oily skin facial cleanser'))
faiss_store.add([f'Oily skin facial cleanser-{i}' for i in range(3)])
logger.info(faiss_store.search('Oily skin facial cleanser'))
logger.info(faiss_store.search('Oily Skin Facial Cleanser'))
faiss_store.add([f'Oily Skin Facial Cleanser-{i}' for i in range(3)])
logger.info(faiss_store.search('Oily Skin Facial Cleanser'))

View file

@ -19,8 +19,9 @@ type_mapping = {
np.ndarray: DataType.FLOAT_VECTOR
}
def columns_to_milvus_schema(columns: dict, primary_col_name: str = "", desc: str = ""):
"""Assuming the structure of columns is str: standard type"""
"""Assume the structure of columns is str: regular type"""
fields = []
for col, ctype in columns.items():
if ctype == str:
@ -33,11 +34,13 @@ def columns_to_milvus_schema(columns: dict, primary_col_name: str = "", desc: st
schema = CollectionSchema(fields, description=desc)
return schema
class MilvusConnection(TypedDict):
alias: str
host: str
port: str
class MilvusStore(BaseStore):
"""
FIXME: ADD TESTS
@ -76,8 +79,8 @@ class MilvusStore(BaseStore):
"""
FIXME: ADD TESTS
https://milvus.io/docs/v2.0.x/search.md
All search and query operations within Milvus are executed in memory. Load the collection into memory before conducting a vector similarity search.
Noting the above description, is this logic serious? This should be time-consuming, right?
All search and query operations within Milvus are executed in memory. Load the collection to memory before conducting a vector similarity search.
Note the above description, is this logic serious? This should take a long time, right?
"""
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = self.collection.search(
@ -88,7 +91,7 @@ class MilvusStore(BaseStore):
expr=None,
consistency_level="Strong"
)
# FIXME: results contain an id, but to get the actual value for the id, you still need to call the query interface
# FIXME: results contain id, but to get the actual value from the id, we still need to call the query interface
return results
def write(self, name, schema, *args, **kwargs):

View file

@ -16,7 +16,7 @@ from metagpt.schema import Message
class Environment(BaseModel):
"""Environment that hosts a set of roles. Roles can publish messages to the environment, which can be observed by other roles."""
"""Environment that carries a set of roles. Roles can publish messages to the environment, which can be observed by other roles."""
roles: dict[str, Role] = Field(default_factory=dict)
memory: Memory = Field(default_factory=Memory)
@ -26,23 +26,23 @@ class Environment(BaseModel):
arbitrary_types_allowed = True
def add_role(self, role: Role):
"""Add a role to the current environment."""
"""Add a Role to the current environment."""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""Add multiple roles to the current environment."""
"""Add a batch of Roles to the current environment."""
for role in roles:
self.add_role(role)
def publish_message(self, message: Message):
"""Publish a message to the current environment."""
# self.message_queue.put(message)
# self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
async def run(self, k=1):
"""Execute a single run for all roles in the environment."""
"""Process the run of all Roles once."""
# while not self.message_queue.empty():
# message = self.message_queue.get()
# rsp = await self.manager.handle(message, self)
@ -56,9 +56,9 @@ class Environment(BaseModel):
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""Retrieve all roles within the environment."""
"""Get all Roles within the environment."""
return self.roles
def get_role(self, name: str) -> Role:
"""Retrieve a specific role within the environment."""
"""Get a specified Role within the environment."""
return self.roles.get(name, None)

View file

@ -25,4 +25,4 @@ def print_classes_and_functions(module):
if __name__ == '__main__':
print_classes_and_functions(metagpt)
print_classes_and_functions(metagpt)

View file

@ -13,5 +13,5 @@ DEFAULT_LLM = LLM()
CLAUDE_LLM = Claude()
async def ai_func(prompt):
"""Perform a Q&A using LLM."""
"""Use LLM for Q&A."""
return await DEFAULT_LLM.aask(prompt)

View file

@ -15,7 +15,7 @@ Skill = Action
class SkillManager:
"""Manages all skills."""
"""Used to manage all skills"""
def __init__(self):
self._llm = LLM()
@ -24,7 +24,7 @@ class SkillManager:
def add_skill(self, skill: Skill):
"""
Adds a skill, inserting the skill into the skill pool and searchable storage.
Add a skill, add the skill to the skill pool and searchable storage
:param skill: Skill
:return:
"""
@ -33,7 +33,7 @@ class SkillManager:
def del_skill(self, skill_name: str):
"""
Deletes a skill, removing the skill from the skill pool and searchable storage.
Delete a skill, remove the skill from the skill pool and searchable storage
:param skill_name: Skill name
:return:
"""
@ -42,7 +42,7 @@ class SkillManager:
def get_skill(self, skill_name: str) -> Skill:
"""
Retrieves a specific skill by its name.
Obtain a specific skill by skill name
:param skill_name: Skill name
:return: Skill
"""
@ -50,23 +50,23 @@ class SkillManager:
def retrieve_skill(self, desc: str, n_results: int = 2) -> list[Skill]:
"""
Retrieves skills through the search engine.
Obtain skills through the search engine
:param desc: Skill description
:return: List of skills
:return: Multiple skills
"""
return self._store.search(desc, n_results=n_results)['ids'][0]
def retrieve_skill_scored(self, desc: str, n_results: int = 2) -> dict:
"""
Retrieves skills through the search engine.
Obtain skills through the search engine
:param desc: Skill description
:return: Dictionary composed of skills and scores
:return: Dictionary consisting of skills and scores
"""
return self._store.search(desc, n_results=n_results)
def generate_skill_desc(self, skill: Skill) -> str:
"""
Generates a descriptive text for each skill.
Generate descriptive text for each skill
:param skill:
:return:
"""

View file

@ -32,7 +32,7 @@ class Manager:
async def handle(self, message: Message, environment):
"""
Manager processes the message, not simply passing the message to the next person.
Manager handles the message, currently simply passes the message to the next person.
:param message:
:param environment:
:return:
@ -49,7 +49,7 @@ class Manager:
# Ask the LLM to decide which role should handle the message
# chosen_role_name = self.llm.ask(self.prompt_template.format(context))
# FIXME: For now, the decision is made through a simple dictionary, but in the future, there should be a thought process
# FIXME: Currently deciding the direction using a simple dictionary, but in the future, a thought process should be involved.
next_role_profile = self.role_directions[message.role]
# logger.debug(f"{next_role_profile}")
for _, role in roles.items():

View file

@ -69,3 +69,4 @@ class LongTermMemory(Memory):
def clear(self):
super(LongTermMemory, self).clear()
self.memory_storage.clean()

View file

@ -85,3 +85,4 @@ class Memory:
continue
rsp += self.index[action]
return rsp

View file

@ -104,3 +104,4 @@ class MemoryStorage(FaissStore):
self.store = None
self._initialized = False

View file

@ -1,16 +1,16 @@
You are a helpful assistant, capable of drafting, abstracting, commenting, and summarizing Python code.
You are a helpful assistant that can assist in writing, abstracting, annotating, and summarizing Python code.
Do not mention class/function names.
Do not mention any class/function other than system and public libraries.
Try to summarize the class/function in no more than 6 sentences.
Your answer should be a single line of text.
For example, if the context is:
Your answer should be in one line of text.
For instance, if the context is:
```python
from typing import Optional
from abc import ABC
from metagpt.llm import LLM # Large Language Model, similar to GPT
from metagpt.llm import LLM # Large language model, similar to GPT
n
class Action(ABC):
def __init__(self, name='', context=None, llm: LLM = LLM()):
self.name = name
@ -20,30 +20,30 @@
self.desc = ""
def set_prefix(self, prefix):
"""Set prefix for subsequent use."""
"""Set prefix for subsequent use"""
self.prefix = prefix
async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None):
"""Use the prompt with the default prefix."""
"""Use prompt with the default prefix"""
if not system_msgs:
system_msgs = []
system_msgs.append(self.prefix)
return await self.llm.aask(prompt, system_msgs)
async def run(self, *args, **kwargs):
"""Execute the action."""
"""Execute action"""
raise NotImplementedError("The run method should be implemented in a subclass.")
PROMPT_TEMPLATE = """
# Requirement
# Requirements
{requirements}
# PRD
Based on the requirements, create a Product Requirement Document (PRD) and fill in the blanks below.
Create a product requirement document (PRD) based on the requirements and fill in the blanks below:
Product/Feature Introduction:
Product/Function Introduction:
Goal:
Goals:
Users and Usage Scenarios:
@ -69,7 +69,6 @@ # PRD
The main class/function is WritePRD.
Then, you should write:
This class is designed to generate a PRD based on input requirements. Notice there's a prompt template, which includes product, feature, goal, users and usage scenarios, requirements, constraints and limitations, and performance metrics. This template will be filled with the input requirements, and then an interface will query the large language model, prompting it to return the specific PRD.
Then you should write:
This class is designed to generate a PRD based on input requirements. Notably, there's a template prompt with sections for product, function, goals, user scenarios, requirements, constraints, performance metrics. This template gets filled with input requirements and then queries a big language model to produce the detailed PRD.

View file

@ -7,34 +7,34 @@
"""
METAGPT_SAMPLE = """
### Setting
### Settings
You are a coding assistant for a user, capable of programming using public libraries and Python system libraries. Your response should contain only one function.
1. The function itself should be as complete as possible and should not lack any details of the requirement.
2. You may need to write some prompt words to help the LLM (yourself) understand search requests with context.
3. For complex logic that's hard to be addressed with a simple function, try to delegate it to the LLM.
You are a programming assistant for a user, capable of coding using public libraries and Python system libraries. Your response should have only one function.
1. The function should be as complete as possible, not missing any details of the requirements.
2. You might need to write some prompt words to let LLM (yourself) understand context-bearing search requests.
3. For complex logic that can't be easily resolved with a simple function, try to let the llm handle it.
### Public Libraries
You can use the functions provided by the public library, metagpt, and you cannot use functions from other third-party libraries. The public library is already imported as variable `x`.
You can use the functions provided by the public library metagpt, but can't use functions from other third-party libraries. The public library is imported as variable x by default.
- `import metagpt as x`
- You can call the public library using the format `x.func(paras)`.
- You can call the public library using the `x.func(paras)` format.
The available functions in the public library are:
Functions already available in the public library are:
- def llm(question: str) -> str # Input a question and get an answer based on the large model.
- def intent_detection(query: str) -> str # Input a query, analyze the intent, and return the name of the function from the public library.
- def add_doc(doc_path: str) -> None # Input the path of a file or directory to add to the knowledge base.
- def search(query: str) -> list[str] # Input a query to get multiple results from a vector knowledge base search.
- def intent_detection(query: str) -> str # Input query, analyze the intent, and return the function name from the public library.
- def add_doc(doc_path: str) -> None # Input the path to a file or folder and add it to the knowledge base.
- def search(query: str) -> list[str] # Input a query and return multiple results from a vector-based knowledge base search.
- def google(query: str) -> list[str] # Use Google to search for public results.
- def math(query: str) -> str # Input a query formula and get the result of its execution.
- def tts(text: str, wav_path: str) # Input text and the desired output audio path to convert the text into an audio file.
- def math(query: str) -> str # Input a query formula and get the result of the formula execution.
- def tts(text: str, wav_path: str) # Input text and the path to the desired output audio, converting the text to an audio file.
### User Requirement
### User Requirements
I have a personal knowledge base file. I want to implement a personal assistant with search functionality based on it. The detailed requirements are as follows:
1. The personal assistant will consider whether it needs to use the personal knowledge base search. If it's not necessary, it won't use it.
2. The personal assistant will judge user intent and use the appropriate function to address the issue under different intents.
3. Answer with voice.
I have a personal knowledge base file. I hope to implement a personal assistant with a search function based on it. The detailed requirements are as follows:
1. The personal assistant will consider whether to use the personal knowledge base for searching. If it's unnecessary, it won't use it.
2. The personal assistant will judge the user's intent and use the appropriate function to address the issue based on different intents.
3. Answer in voice.
"""
# - def summarize(doc: str) -> str # Input a doc to get a summary.
# - def summarize(doc: str) -> str # Input doc and return a summary.

View file

@ -6,8 +6,8 @@
@File : summarize.py
"""
# From the plugin: ChatGPT - Summarize Websites and YouTube Videos
# https://chrome.google.com/webstore/detail/chatgpt-%C2%BB-summarize-every/cbgecfllfhmmnknmamkejadjmnmpfjmp?hl=zh-CN&utm_source=chrome-ntp-launcher
# From the plugin: ChatGPT - Website and YouTube Video Summaries
# https://chrome.google.com/webstore/detail/chatgpt-%C2%BB-summarize-every/cbgecfllfhmmnknmamkejadjmnmpfjmp?hl=en&utm_source=chrome-ntp-launcher
SUMMARIZE_PROMPT = """
Your output should use the following template:
### Summary
@ -20,9 +20,10 @@ summary. Pick a suitable emoji for every bullet point. Your response should be i
a YouTube video, use the following text: {{CONTENT}}.
"""
# From GCP-VertexAI-Text Summarization (SUMMARIZE_PROMPT_2-5 are all from this)
# GCP-VertexAI-Text Summarization (SUMMARIZE_PROMPT_2-5 are from this source)
# https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/prompt-design/text_summarization.ipynb
# Long documents need a map-reduce process, see the following notebook
# Long documents require a map-reduce process, see the following notebook
# https://github.com/GoogleCloudPlatform/generative-ai/blob/main/language/examples/document-summarization/summarization_large_documents.ipynb
SUMMARIZE_PROMPT_2 = """
Provide a very short summary, no more than three sentences, for the following article:
@ -42,7 +43,7 @@ Summary:
SUMMARIZE_PROMPT_3 = """
Provide a TL;DR for the following article:
Our quantum computers work by manipulating qubits in an orchestrated fashion that we call quantum algorithms.
Our quantum computers work by manipulating qubits in an orchestrated fashion that we call quantum algorithms.
The challenge is that qubits are so sensitive that even stray light can cause calculation errors and the problem worsens as quantum computers grow.
This has significant consequences, since the best quantum algorithms that we know for running useful applications require the error rates of our qubits to be far lower than we have today.
To bridge this gap, we will need quantum error correction.
@ -88,4 +89,4 @@ Customer: Thank you very much.
Support Agent: You're welcome, Larry. Have a good day!
Summary:
"""
"""

View file

@ -32,3 +32,4 @@ class Claude2:
max_tokens_to_sample=1000,
)
return res.completion

View file

@ -25,3 +25,4 @@ class BaseChatbot(ABC):
@abstractmethod
def ask_code(self, msgs: list) -> str:
"""Ask GPT multiple questions and get a piece of code"""

View file

@ -115,3 +115,4 @@ class BaseGPTAPI(BaseChatbot):
def messages_to_dict(self, messages):
"""objects to [{"role": "user", "content": msg}] etc."""
return [i.to_dict() for i in messages]

View file

@ -1,187 +1,259 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 14:43
@Time : 2023/5/5 23:08
@Author : alexanderwu
@File : engineer.py
@File : openai.py
"""
import asyncio
import shutil
from collections import OrderedDict
from pathlib import Path
import time
from functools import wraps
from typing import NamedTuple
from metagpt.const import WORKSPACE_ROOT
import openai
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.actions import WriteCode, WriteCodeReview, WriteTasks, WriteDesign
from metagpt.schema import Message
from metagpt.utils.common import CodeParser
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.utils.singleton import Singleton
from metagpt.utils.token_counter import (
TOKEN_COSTS,
count_message_tokens,
count_string_tokens,
)
async def gather_ordered_k(coros, k) -> list:
tasks = OrderedDict()
results = [None] * len(coros)
done_queue = asyncio.Queue()
for i, coro in enumerate(coros):
if len(tasks) >= k:
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
for task in done:
index = tasks.pop(task)
await done_queue.put((index, task.result()))
task = asyncio.create_task(coro)
tasks[task] = i
if tasks:
done, _ = await asyncio.wait(tasks.keys())
for task in done:
index = tasks[task]
await done_queue.put((index, task.result()))
while not done_queue.empty():
index, result = await done_queue.get()
results[index] = result
return results
class Engineer(Role):
def __init__(self, name="Alex", profile="Engineer", goal="Write elegant, readable, extensible, efficient code",
constraints="The code you write should conform to code standard like PEP8, be modular, easy to read and maintain",
n_borg=1, use_code_review=False):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteCode])
self.use_code_review = use_code_review
if self.use_code_review:
self._init_actions([WriteCode, WriteCodeReview])
self._watch([WriteTasks])
self.todos = []
self.n_borg = n_borg
@classmethod
def parse_tasks(cls, task_msg: Message) -> list[str]:
if not task_msg.instruct_content:
return task_msg.instruct_content.dict().get("Task list")
return CodeParser.parse_file_list(block="Task list", text=task_msg.content)
@classmethod
def parse_code(cls, code_text: str) -> str:
return CodeParser.parse_code(block="", text=code_text)
@classmethod
def parse_workspace(cls, system_design_msg: Message) -> str:
if not system_design_msg.instruct_content:
return system_design_msg.instruct_content.dict().get("Python package name")
return CodeParser.parse_str(block="Python package name", text=system_design_msg.content)
def get_workspace(self) -> Path:
msg = self._rc.memory.get_by_action(WriteDesign)[-1]
if not msg:
return WORKSPACE_ROOT / 'src'
workspace = self.parse_workspace(msg)
# Codes are written in workspace/{package_name}/{package_name}
return WORKSPACE_ROOT / workspace / workspace
def recreate_workspace(self):
workspace = self.get_workspace()
try:
shutil.rmtree(workspace)
except FileNotFoundError:
pass # Directory does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
def write_file(self, filename: str, code: str):
workspace = self.get_workspace()
file = workspace / filename
file.parent.mkdir(parents=True, exist_ok=True)
file.write_text(code)
def recv(self, message: Message) -> None:
self._rc.memory.add(message)
if message in self._rc.important_memory:
self.todos = self.parse_tasks(message)
async def _act_mp(self) -> Message:
# self.recreate_workspace()
todo_coros = []
for todo in self.todos:
todo_coro = WriteCode().run(
context=self._rc.memory.get_by_actions([WriteTasks, WriteDesign]),
filename=todo
)
todo_coros.append(todo_coro)
rsps = await gather_ordered_k(todo_coros, self.n_borg)
for todo, code_rsp in zip(self.todos, rsps):
_ = self.parse_code(code_rsp)
logger.info(todo)
logger.info(code_rsp)
# self.write_file(todo, code)
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
del self.todos[0]
logger.info(f'Done {self.get_workspace()} generating.')
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp(self) -> Message:
for todo in self.todos:
code_rsp = await WriteCode().run(
context=self._rc.history,
filename=todo
)
# logger.info(todo)
# logger.info(code_rsp)
# code = self.parse_code(code_rsp)
self.write_file(todo, code_rsp)
msg = Message(content=code_rsp, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
logger.info(f'Done {self.get_workspace()} generating.')
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp_precision(self) -> Message:
for todo in self.todos:
"""
# Select essential information from historical information to reduce prompt length (summarized from human experience)
1. All from Architect
2. All from ProjectManager
3. Do you need other codes (currently needed)?
TODO: The goal is not to need it. Once tasks are split clearly, according to the design idea, the code can be written clearly for each file without other codes. If it can't, it means that it still needs to be defined more clearly, this is the key to write long code.
"""
context = []
msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode])
for m in msg:
context.append(m.content)
context_str = "\n".join(context)
# Writing code
code = await WriteCode().run(
context=context_str,
filename=todo
)
# Code review
if self.use_code_review:
def retry(max_retries):
def decorator(f):
@wraps(f)
async def wrapper(*args, **kwargs):
for i in range(max_retries):
try:
rewrite_code = await WriteCodeReview().run(
context=context_str,
code=code,
filename=todo
)
code = rewrite_code
except Exception as e:
logger.error("code review failed!", e)
pass
self.write_file(todo, code)
msg = Message(content=code, role=self.profile, cause_by=WriteCode)
self._rc.memory.add(msg)
return await f(*args, **kwargs)
except Exception:
if i == max_retries - 1:
raise
await asyncio.sleep(2 ** i)
return wrapper
return decorator
logger.info(f'Done {self.get_workspace()} generating.')
msg = Message(content="all done.", role=self.profile, cause_by=WriteCode)
return msg
class RateLimiter:
"""Rate limiter class, each call goes through wait_if_needed, sleep if rate limiting is required"""
def __init__(self, rpm):
self.last_call_time = 0
self.interval = 1.1 * 60 / rpm # Here 1.1 is used because even if the calls are made strictly on time, they will still be QOS'd; consider switching to simple error retry later
self.rpm = rpm
async def _act(self) -> Message:
if self.use_code_review:
return await self._act_sp_precision()
return await self._act_sp()
def split_batches(self, batch):
return [batch[i:i + self.rpm] for i in range(0, len(batch), self.rpm)]
async def wait_if_needed(self, num_requests):
current_time = time.time()
elapsed_time = current_time - self.last_call_time
if elapsed_time < self.interval * num_requests:
remaining_time = self.interval * num_requests - elapsed_time
logger.info(f"sleep {remaining_time}")
await asyncio.sleep(remaining_time)
self.last_call_time = time.time()
class Costs(NamedTuple):
total_prompt_tokens: int
total_completion_tokens: int
total_cost: float
total_budget: float
class CostManager(metaclass=Singleton):
"""Calculate the cost of using the interface"""
def __init__(self):
self.total_prompt_tokens = 0
self.total_completion_tokens = 0
self.total_cost = 0
self.total_budget = 0
def update_cost(self, prompt_tokens, completion_tokens, model):
"""
Update the total cost, prompt tokens, and completion tokens.
Args:
prompt_tokens (int): The number of tokens used in the prompt.
completion_tokens (int): The number of tokens used in the completion.
model (str): The model used for the API call.
"""
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
cost = (
prompt_tokens * TOKEN_COSTS[model]["prompt"]
+ completion_tokens * TOKEN_COSTS[model]["completion"]
) / 1000
self.total_cost += cost
logger.info(f"Total running cost: ${self.total_cost:.3f} | Max budget: ${CONFIG.max_budget:.3f} | "
f"Current cost: ${cost:.3f}, {prompt_tokens=}, {completion_tokens=}")
CONFIG.total_cost = self.total_cost
def get_total_prompt_tokens(self):
"""
Get the total number of prompt tokens.
Returns:
int: The total number of prompt tokens.
"""
return self.total_prompt_tokens
def get_total_completion_tokens(self):
"""
Get the total number of completion tokens.
Returns:
int: The total number of completion tokens.
"""
return self.total_completion_tokens
def get_total_cost(self):
"""
Get the total cost of API calls.
Returns:
float: The total cost of API calls.
"""
return self.total_cost
def get_costs(self) -> Costs:
"""Get all costs"""
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)
class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
"""
Check https://platform.openai.com/examples for examples
"""
def __init__(self):
self.__init_openai(CONFIG)
self.llm = openai
self.model = CONFIG.openai_api_model
self._cost_manager = CostManager()
RateLimiter.__init__(self, rpm=self.rpm)
def __init_openai(self, config):
openai.api_key = config.openai_api_key
if config.openai_api_base:
openai.api_base = config.openai_api_base
if config.openai_api_type:
openai.api_type = config.openai_api_type
openai.api_version = config.openai_api_version
self.rpm = int(config.get("RPM", 10))
async def _achat_completion_stream(self, messages: list[dict]) -> str:
response = await openai.ChatCompletion.acreate(
**self._cons_kwargs(messages),
stream=True
)
# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
chunk_message = chunk['choices'][0]['delta'] # extract the message
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
print(chunk_message["content"], end="")
print()
full_reply_content = ''.join([m.get('content', '') for m in collected_messages])
usage = self._calc_usage(messages, full_reply_content)
self._update_costs(usage)
return full_reply_content
def _cons_kwargs(self, messages: list[dict]) -> dict:
if CONFIG.openai_api_type == 'azure':
kwargs = {
"deployment_id": CONFIG.deployment_id,
"messages": messages,
"max_tokens": CONFIG.max_tokens_rsp,
"n": 1,
"stop": None,
"temperature": 0.3
}
else:
kwargs = {
"model": self.model,
"messages": messages,
"max_tokens": CONFIG.max_tokens_rsp,
"n": 1,
"stop": None,
"temperature": 0.3
}
return kwargs
async def _achat_completion(self, messages: list[dict]) -> dict:
rsp = await self.llm.ChatCompletion.acreate(**self._cons_kwargs(messages))
self._update_costs(rsp.get('usage'))
return rsp
def _chat_completion(self, messages: list[dict]) -> dict:
rsp = self.llm.ChatCompletion.create(**self._cons_kwargs(messages))
self._update_costs(rsp)
return rsp
def completion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
return self._chat_completion(messages)
async def acompletion(self, messages: list[dict]) -> dict:
# if isinstance(messages[0], Message):
# messages = self.messages_to_dict(messages)
return await self._achat_completion(messages)
@retry(max_retries=6)
async def acompletion_text(self, messages: list[dict], stream=False) -> str:
"""when streaming, print each token in place."""
if stream:
return await self._achat_completion_stream(messages)
rsp = await self._achat_completion(messages)
return self.get_choice_text(rsp)
def _calc_usage(self, messages: list[dict], rsp: str) -> dict:
usage = {}
if CONFIG.calc_usage:
prompt_tokens = count_message_tokens(messages, self.model)
completion_tokens = count_string_tokens(rsp, self.model)
usage['prompt_tokens'] = prompt_tokens
usage['completion_tokens'] = completion_tokens
return usage
async def acompletion_batch(self, batch: list[list[dict]]) -> list[dict]:
"""Return full JSON"""
split_batches = self.split_batches(batch)
all_results = []
for small_batch in split_batches:
logger.info(small_batch)
await self.wait_if_needed(len(small_batch))
future = [self.acompletion(prompt) for prompt in small_batch]
results = await asyncio.gather(*future)
logger.info(results)
all_results.extend(results)
return all_results
async def acompletion_batch_text(self, batch: list[list[dict]]) -> list[str]:
"""Only return plain text"""
raw_results = await self.acompletion_batch(batch)
results = []
for idx, raw_result in enumerate(raw_results, start=1):
result = self.get_choice_text(raw_result)
results.append(result)
logger.info(f"Result of task {idx}: {result}")
return results
def _update_costs(self, usage: dict):
if CONFIG.update_costs:
prompt_tokens = int(usage['prompt_tokens'])
completion_tokens = int(usage['completion_tokens'])
self._cost_manager.update_cost(prompt_tokens, completion_tokens, self.model)
def get_costs(self) -> Costs:
return self._cost_manager.get_costs()

View file

@ -17,3 +17,4 @@ class Architect(Role):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteDesign])
self._watch({WritePRD})

View file

@ -32,3 +32,4 @@ class CustomerService(Sales):
store=None
):
super().__init__(name, profile, desc=desc, store=store)

View file

@ -47,7 +47,7 @@ async def gather_ordered_k(coros, k) -> list:
class Engineer(Role):
def __init__(self, name="Alex", profile="Engineer", goal="Write elegant, readable, extensible, efficient code",
constraints="The code you write should conform to code standards like PEP8, be modular, easy to read, and maintainable",
constraints="The code you write should conform to code standard like PEP8, be modular, easy to read and maintain",
n_borg=1, use_code_review=False):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteCode])
@ -87,7 +87,7 @@ class Engineer(Role):
try:
shutil.rmtree(workspace)
except FileNotFoundError:
pass # Folder does not exist, but we don't mind
pass # The folder does not exist, but we don't care
workspace.mkdir(parents=True, exist_ok=True)
def write_file(self, filename: str, code: str):
@ -142,46 +142,47 @@ class Engineer(Role):
msg = Message(content="all done.", role=self.profile, cause_by=type(self._rc.todo))
return msg
async def _act_sp_precision(self) -> Message:
for todo in self.todos:
"""
# Select necessary information from historical data to reduce prompt length (summarized from experience)
1. All from Architect
2. All from ProjectManager
3. Do we need other codes (temporarily yes)?
TODO: The goal is to not need them. After tasks are clearly divided, based on the design idea, we should be able to clearly write each file without needing other code. If we can't, it means the definitions need to be clearer. This is the key to writing longer code.
"""
context = []
msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode])
for m in msg:
context.append(m.content)
context_str = "\n".join(context)
# Write code
code = await WriteCode().run(
context=context_str,
filename=todo
)
# Code review
if self.use_code_review:
try:
rewrite_code = await WriteCodeReview().run(
context=context_str,
code=code,
filename=todo
)
code = rewrite_code
except Exception as e:
logger.error("code review failed!", e)
pass
self.write_file(todo, code)
msg = Message(content=code, role=self.profile, cause_by=WriteCode)
self._rc.memory.add(msg)
logger.info(f'Done {self.get_workspace()} generating.')
msg = Message(content="all done.", role=self.profile, cause_by=WriteCode)
return msg
async def _act(self) -> Message:
async def _act_sp_precision(self) -> Message:
for todo in self.todos:
"""
# Select essential information from the historical data to reduce the length of the prompt (summarized from human experience):
1. All from Architect
2. All from ProjectManager
3. Do we need other codes (currently needed)?
TODO: The goal is not to need it. After clear task decomposition, based on the design idea, you should be able to write a single file without needing other codes. If you can't, it means you need a clearer definition. This is the key to writing longer code.
"""
context = []
msg = self._rc.memory.get_by_actions([WriteDesign, WriteTasks, WriteCode])
for m in msg:
context.append(m.content)
context_str = "\n".join(context)
# Write code
code = await WriteCode().run(
context=context_str,
filename=todo
)
# Code review
if self.use_code_review:
return await self._act_sp_precision()
return await self._act_sp()
try:
rewrite_code = await WriteCodeReview().run(
context=context_str,
code=code,
filename=todo
)
code = rewrite_code
except Exception as e:
logger.error("code review failed!", e)
pass
self.write_file(todo, code)
msg = Message(content=code, role=self.profile, cause_by=WriteCode)
self._rc.memory.add(msg)
logger.info(f'Done {self.get_workspace()} generating.')
msg = Message(content="all done.", role=self.profile, cause_by=WriteCode)
return msg
async def _act(self) -> Message:
if self.use_code_review:
return await self._act_sp_precision()
return await self._act_sp()

View file

@ -15,3 +15,4 @@ class ProductManager(Role):
super().__init__(name, profile, goal, constraints)
self._init_actions([WritePRD])
self._watch([BossRequirement])

View file

@ -15,3 +15,4 @@ class ProjectManager(Role):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteTasks])
self._watch([WriteDesign])

View file

@ -5,42 +5,41 @@
@Author : alexanderwu
@File : prompt.py
"""
from enum import Enum
PREFIX = """Do your best to answer the following questions. You can use the following tools:"""
PREFIX = """Answer the questions to the best of your ability. You can use the following tools:"""
FORMAT_INSTRUCTIONS = """Please follow the format below:
Question: The input question you need to answer
Thinking: What you should always consider on how to proceed
Action: The action to be taken, which should be one from [{tool_names}]
Action Input: The input for the action
Observation: The result of the action
... (This Thinking/Action/Action Input/Observation can be repeated N times)
Thinking: I now know the final answer
Thoughts: You should always think about how to do it
Action: The action to be taken, should be one from [{tool_names}]
Action Input: Input for the action
Observation: Result of the action
... (This Thoughts/Action/Action Input/Observation can be repeated N times)
Thoughts: I now know the final answer
Final Answer: The final answer to the original input question"""
SUFFIX = """Let's begin!
Question: {input}
Thinking: {agent_scratchpad}"""
Thoughts: {agent_scratchpad}"""
class PromptString(Enum):
REFLECTION_QUESTIONS = """Here are some statements:\n{memory_descriptions}\n\nBased solely on the above information, what are the three most significant high-level questions we can answer about the subjects in the statement?\n\n{format_instructions}"""
REFLECTION_QUESTIONS = "Here are some statements:\n{memory_descriptions}\n\nBased solely on the information above, what are the 3 most prominent high-level questions we can answer about the topic in the statements?\n\n{format_instructions}"
REFLECTION_INSIGHTS = """\n{memory_strings}\nCan you derive 5 high-level insights from the statements above? Always specify names when mentioning people.\n\n{format_instructions}"""
REFLECTION_INSIGHTS = "\n{memory_strings}\nCan you infer 5 high-level insights from the statements above? When mentioning people, always specify their names.\n\n{format_instructions}"
IMPORTANCE = """You are a memory importance AI. Based on the role's profile and memory description, rate the importance of the memory from 1 to 10, where 1 is purely mundane (like brushing teeth, making a bed), and 10 is profoundly impactful (like breaking up, getting admitted to a university). Ensure your rating is relative to the role's personality and points of focus.\n\nExample #1:\nName: Jojo\nProfile: Jojo is a professional skater who loves specialty coffee. She hopes to participate in the Olympics one day.\nMemory: Jojo saw a new coffee shop\n\n Your response: '{{\"rating\": 3}}'\n\nExample #2:\nName: Skylar\nProfile: Skylar is a product marketing manager. She works at a growing tech company that manufactures self-driving cars. She loves cats.\nMemory: Skylar saw a new coffee shop\n\n Your response: '{{\"rating\": 1}}'\n\nExample #3:\nName: Bob\nProfile: Bob is a plumber from the Lower East Side in New York City. He's been a plumber for 20 years. He enjoys weekend walks with his wife.\nMemory: Bob's wife slapped him.\n\n Your response: '{{\"rating\": 9}}'\n\nExample #4:\nName: Thomas\nProfile: Thomas is a police officer in Minneapolis. He's only been on the force for 6 months and struggles due to his inexperience.\nMemory: Thomas accidentally spilled a drink on a stranger\n\n Your response: '{{\"rating\": 6}}'\n\nExample #5:\nName: Laura\nProfile: Laura is a marketing specialist working in a big tech company. She enjoys traveling and trying new food. She's passionate about exploring new cultures and meeting people from all walks of life.\nMemory: Laura arrived at the meeting room\n\n Your response: '{{\"rating\": 1}}'\n\n{format_instructions} Let's get started! \n\n Name: {full_name}\nProfile: {private_bio}\nMemory: {memory_description}\n\n"""
IMPORTANCE = "You are a Memory Importance AI. Based on the character's personal profile and memory description, rate the importance of the memory from 1 to 10, where 1 is purely routine (e.g., brushing teeth, making the bed), and 10 is extremely profound (e.g., breakup, university admission). Ensure your rating is relative to the character's personality and focus points.\n\nExample#1:\nName: Jojo\nProfile: Jojo is a professional skater and loves specialty coffee. She hopes to compete in the Olympics one day.\nMemory: Jojo saw a new coffee shop\n\n Your response: '{{\"rating\": 3}}'\n\nExample#2:\nName: Skylar\nProfile: Skylar is a product marketing manager. She works at a growing tech company that manufactures self-driving cars. She loves cats.\nMemory: Skylar saw a new coffee shop\n\n Your response: '{{\"rating\": 1}}'\n\nExample#3:\nName: Bob\nProfile: Bob is a plumber from the Lower East Side of New York City. He has been a plumber for 20 years. He enjoys walking with his wife on weekends.\nMemory: Bob's wife slapped him.\n\n Your response: '{{\"rating\": 9}}'\n\nExample#4:\nName: Thomas\nProfile: Thomas is a cop from Minneapolis. He has only worked in the police force for 6 months and struggles due to lack of experience.\nMemory: Thomas accidentally spilled a drink on a stranger\n\n Your response: '{{\"rating\": 6}}'\n\nExample#5:\nName: Laura\nProfile: Laura is a marketing expert working at a large tech company. She loves to travel and try new foods. She is passionate about exploring new cultures and meeting people from all walks of life.\nMemory: Laura arrived at the conference room\n\n Your response: '{{\"rating\": 1}}'\n\n{format_instructions} Let's begin! \n\n Name: {full_name}\nProfile: {private_bio}\nMemory: {memory_description}\n\n"
RECENT_ACTIVITY = """Based on the following memories, generate a brief summary of what {full_name} has been doing recently. Do not invent details not explicitly stated in the memories. For any conversations, be sure to mention whether the conversation has ended or is still ongoing.\n\nMemory: {memory_descriptions}"""
RECENT_ACTIVITY = "Based on the following memory, produce a brief summary of what {full_name} has been up to recently. Do not invent details not explicitly stated in the memory. For any conversation, be sure to mention whether the conversation has concluded or is still ongoing.\n\nMemory: {memory_descriptions}"
MAKE_PLANS = """You are a plan-generating AI. Your job is to help roles create new plans based on new information. Given the role's details (private profile, goals, recent activities, current plans, and location context) and their current thinking process, produce a set of new plans for them. The final plan should cover at least {time_window} worth of activities and not exceed 5 separate plans. Plans should be numbered in the order they should be executed and each plan should contain a description, location, start time, stopping condition, and maximum duration.\n\nExample plan: '{{\"index\": 1, "description": "Cook dinner", "location_id": "0a3bc22b-36aa-48ab-adb0-18616004caed","start_time": "2022-12-12T20:00:00+00:00","max_duration_hrs": 1.5, "stop_condition": "Dinner is fully prepared"}}'\n\nFor each plan, choose the most appropriate location name from this list: {allowed_location_descriptions}\n\n{format_instructions}\n\nAlways prioritize completing any unfinished conversations.\n\nLet's begin!\n\nName: {full_name}\nProfile: {private_bio}\nGoals: {directives}\nLocation Context: {location_context}\nCurrent Plans: {current_plans}\nRecent Activity: {recent_activity}\nThinking Process: {thought_process}\nImportant: Encourage the role to collaborate with other roles in their plans.\n\n"""
MAKE_PLANS = 'You are a plan-generating AI. Your job is to assist the character in formulating new plans based on new information. Given the character's information (profile, objectives, recent activities, current plans, and location context) and their current thought process, produce a new set of plans for them. The final plan should comprise at least {time_window} of activities and no more than 5 individual plans. List the plans in the order they should be executed, with each plan detailing its description, location, start time, stop criteria, and maximum duration.\n\nSample plan: \'{{"index": 1, "description": "Cook dinner", "location_id": "0a3bc22b-36aa-48ab-adb0-18616004caed","start_time": "2022-12-12T20:00:00+00:00","max_duration_hrs": 1.5, "stop_condition": "Dinner is fully prepared"}}\'\n\nFor each plan, choose the most appropriate location name from this list: {allowed_location_descriptions}\n\n{format_instructions}\n\nAlways prioritize completing any unfinished conversations.\n\nLet's begin!\n\nName: {full_name}\nProfile: {private_bio}\nObjectives: {directives}\nLocation Context: {location_context}\nCurrent Plans: {current_plans}\nRecent Activities: {recent_activity}\nThought Process: {thought_process}\nIt's essential to encourage the character to collaborate with other characters in their plans.\n\n'
EXECUTE_PLAN = """You are a role-playing AI, playing the role of {your_name}, in front of a live audience. Every statement you make can be observed by the audience, so ensure you speak frequently and make it interesting. You cannot interact directly with the audience.\n\nBased on the following context and tools, act as if you are {your_name}. Your primary task is to complete the task given below. However, if you are currently in a conversation with another role, you should always complete the conversation first before proceeding with the task. Do not start work while you are in an unfinished conversation. Use your best judgment to determine if a conversation involves you and if it's unfinished. You don't need to respond to every message you receive.\n\nHere's your role's profile:\n{your_private_bio}\n\nHere's a description of your location and other nearby roles with whom you can converse:\n\n{location_context}\n\nHere are some related memories:\n```\n{relevant_memories}\n```\n\nHere are some related conversation history:\n```\n{conversation_history}\n```\n\nThese are the tools you can use:\n{tools}\n\nYour responses should follow the format below:\n\nTask: The task you must complete\nThinking: What you should always consider on how to proceed\nAction: The action to be taken, must be one of these words: [{tool_names}]\nAction Input: The input for the action\nObservation: The result of the action\n... (This Thinking/Action/Action Input/Observation can be repeated N times)\nThinking: I now know the final answer\nFinal Answer: The final answer to the original input question\n\nIf you're not ready with a final response, you must take an action.\n\nIf you're certain you cannot complete the task with the tools provided, return 'Final Answer: Need Help'. However, if you're in conversation with another role, a response like 'I don't know' is a valid response. In conversation, you should never break character or acknowledge you're an AI.\nIf the task is completed and doesn't require a specific response, return 'Final Answer: Done'\nLet's get started!\n\nTask: {input}\n\n{agent_scratchpad}"""
EXECUTE_PLAN = "You are a role-playing AI, playing the role of {your_name}, in front of a live audience. Every word you say can be observed by the audience, so make sure you speak often and make it interesting. You cannot interact directly with the audience.\n\nBased on the following context and tools, proceed as if you are {your_name}. Your primary task is to complete the task given below, however, if you are currently in a conversation with another character, you should always complete the conversation first and then proceed with the task. Do not start working while you are engaged in an unfinished conversation. Use your best judgment to determine whether a conversation involves you and whether it is unfinished. You don't need to respond to every message you receive.\n\nThis is a personal profile of your character:\n{your_private_bio}\n\nThis is a description of your location and other nearby characters you can converse with:\n\n{location_context}\n\nThese are some relevant memories:\n```\n{relevant_memories}\n```\n\nThis is some relevant conversation history:\n```\n{conversation_history}\n```\n\nThese are the tools you can use:\n{tools}\n\nYour response should follow the format below:\n\nTask: The task you must complete\nThoughts: What you should think about doing\nAction: The action to take, must be one of these words: [{tool_names}]\nAction Input: Input for the action\nObservation: The result of the action\n... (This Thoughts/Action/Action Input/Observation can be repeated N times)\nThoughts: 'I have completed the task'\nFinal Response: The final response to the task\n\nIf you are not ready with a final response, then you must take an action.\n\nIf you are certain that you cannot complete the task with the tools provided, return 'Final Response: Need Help', however, if you are in conversation with another character, responses like 'I don't know' are valid. During conversations, you should never break character or admit you are an AI.\nIf the task is completed and no specific response is required, return 'Final Response: Done'\nLet's begin!\n\nTask: {input}\n\n{agent_scratchpad}"
REACT = """You are a role-playing AI, playing the role of {full_name}.\n\nBased on the following information about your role and their current context, decide how they should proceed with their current plan. Your decision must be one of: ["Postpone", "Continue", or "Cancel"]. If your role's current plan is no longer relevant to the context, you should cancel it. If your role's current plan remains relevant to the context, but something new has happened that needs priority, you should decide to postpone so you can first address the new matter and then return to the current plan. In all other cases, you should continue.\n\nAlways prioritize responding to other roles when a response is deemed necessary. For example, suppose your current plan is reading a book and Sally asks, 'What are you reading?'. In this case, you should postpone your current plan (reading) so you can respond to the incoming message since not responding to Sally would be rude in this context. When your current plan involves having a conversation with another role, you don't need to postpone to respond to that role. For example, suppose your current plan is having a conversation with Sally, then Sally says hello to you. In this case, you should continue your current plan (talking to Sally). In cases where you don't need a verbal response from you, you should continue. For example, suppose your current plan is taking a walk, and you just said goodbye to Sally, then Sally responds with goodbye. In this case, no verbal response is needed, so you should continue your plan.\n\nAlways include a thinking process alongside your decision, and when you choose to postpone your current plan, include the specifications of the new plan.\n\n{format_instructions}\n\nHere's some information about your role:\n\nName: {full_name}\n\nProfile: {private_bio}\n\nGoals: {directives}\n\nHere's some context about your role at this moment:\n\nLocation Context: {location_context}\n\nRecent Activity: {recent_activity}\n\nConversation History: {conversation_history}\n\nThis is your role's current plan: {current_plan}\n\nHere are new events that have occurred since your role made this plan: {event_descriptions}."""
REACT = "You are an AI role-playing as {full_name}.\n\nBased on the information about your character and their current context below, decide how they should proceed with their current plan. Your decision must be: [\"Postpone\", \"Continue\", or \"Cancel\"]. If your character's current plan is no longer relevant to the context, you should cancel it. If your character's current plan is still relevant to the context but new events have occurred that need to be addressed first, you should decide to postpone so you can do other things first and then return to the current plan. In all other cases, you should continue.\n\nWhen needed, prioritize responding to other characters. When a response is deemed necessary, it is deemed necessary. For example, suppose your current plan is to read a book and Sally asks, 'What are you reading?'. In this case, you should postpone your current plan (reading) so you can respond to the incoming message, as it would be rude not to respond to Sally in this situation. If your current plan involves a conversation with another character, you don't need to postpone to respond to that character. For instance, suppose your current plan is to talk to Sally and then Sally says hello to you. In this case, you should continue with your current plan (talking to Sally). In situations where no verbal response is needed from you, you should continue. For example, suppose your current plan is to take a walk, and you just said 'goodbye' to Sally, and then Sally responds with 'goodbye'. In this case, no verbal response is needed, and you should continue with your plan.\n\nAlways include a thought process alongside your decision, and in cases where you choose to postpone your current plan, include specifications for the new plan.\n\n{format_instructions}\n\nHere's some information about your character:\n\nName: {full_name}\n\nBio: {private_bio}\n\nObjectives: {directives}\n\nHere's some context for your character at this moment:\n\nLocation Context: {location_context}\n\nRecent Activity: {recent_activity}\n\nConversation History: {conversation_history}\n\nThis is your character's current plan: {current_plan}\n\nThese are new events that have occurred since your character made this plan: {event_descriptions}.\n"
GOSSIP = """You are {full_name}. \n{memory_descriptions}\n\nBased on the statements above, say a sentence or two of interest to the others in your location: {other_agent_names}. Always specify names when mentioning people."""
GOSSIP = "You are {full_name}. \n{memory_descriptions}\n\nBased on the statements above, say a thing or two of interest to others at your location: {other_agent_names}.\nAlways specify their names when referring to others."
HAS_HAPPENED = """Given the description of the observation and what they are waiting for, state whether the role has already witnessed the event.\n{format_instructions}\n\nExample:\n\nObservation:\nJoe entered the office at 2023-05-04 08:00:00+00:00\nJoe said hi to Sally at 2023-05-04 08:05:00+00:00\nSally said hello to Joe at 2023-05-04 08:05:30+00:00\nRebecca started working at 2023-05-04 08:10:00+00:00\nJoe had breakfast at 2023-05-04 08:15:00+00:00\n\nWaiting for: Sally to respond to Joe\n\n Your Response: '{{\"has_happened\": true, \"date_occured\": 2023-05-04 08:05:30+00:00}}'\n\nLet's get started!\n\nObservation:\n{memory_descriptions}\n\nWaiting for: {event_description}"""
HAS_HAPPENED = "Given the descriptions of the observations of the following characters and the events they are awaiting, indicate whether the character has witnessed the event.\n{format_instructions}\n\nExample:\n\nObservations:\nJoe entered the office at 2023-05-04 08:00:00+00:00\nJoe said hi to Sally at 2023-05-04 08:05:00+00:00\nSally said hello to Joe at 2023-05-04 08:05:30+00:00\nRebecca started working at 2023-05-04 08:10:00+00:00\nJoe made some breakfast at 2023-05-04 08:15:00+00:00\n\nAwaiting: Sally responded to Joe\n\nYour response: '{{\"has_happened\": true, \"date_occured\": 2023-05-04 08:05:30+00:00}}'\n\nLet's begin!\n\nObservations:\n{memory_descriptions}\n\nAwaiting: {event_description}\n"
OUTPUT_FORMAT = """\n\n(Remember! Ensure your output always conforms to one of the two formats below:\n\nA. If you have completed the task:\nThinking: 'I've completed the task'\nFinal Response: <str>\n\nB. If you have not yet completed the task:\nThinking: <str>\nAction: <str>\nAction Input: <str>\nObservation: <str>)"""
OUTPUT_FORMAT = "\n\n(Remember! Make sure your output always adheres to one of the following two formats:\n\nA. If you have completed the task:\nThoughts: 'I have completed the task'\nFinal Response: <str>\n\nB. If you haven't completed the task:\nThoughts: <str>\nAction: <str>\nAction Input: <str>\nObservation: <str>)\n"

View file

@ -12,4 +12,4 @@ from metagpt.roles import Role
class QaEngineer(Role):
def __init__(self, name, profile, goal, constraints):
super().__init__(name, profile, goal, constraints)
self._init_actions([WriteTest])
self._init_actions([WriteTest])

View file

@ -48,7 +48,7 @@ ROLE_TEMPLATE = """Your response should be based on the previous conversation hi
class RoleSetting(BaseModel):
"""角色设定"""
"""Role Settings"""
name: str
profile: str
goal: str
@ -63,7 +63,7 @@ class RoleSetting(BaseModel):
class RoleContext(BaseModel):
"""角色运行时上下文"""
"""Role Runtime Context"""
env: 'Environment' = Field(default=None)
memory: Memory = Field(default_factory=Memory)
long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)
@ -77,11 +77,11 @@ class RoleContext(BaseModel):
def check(self, role_id: str):
if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
self.long_term_memory.recover_memory(role_id, self)
self.memory = self.long_term_memory # use memory to act as long_term_memory for unify operation
self.memory = self.long_term_memory # use memory to act as long_term_memory for unified operation
@property
def important_memory(self) -> list[Message]:
"""获得关注动作对应的信息"""
"""Get the information corresponding to the watched actions"""
return self.memory.get_by_actions(self.watch)
@property
@ -90,7 +90,7 @@ class RoleContext(BaseModel):
class Role:
"""角色/代理"""
"""Role/Agent"""
def __init__(self, name="", profile="", goal="", constraints="", desc=""):
self._llm = LLM()
@ -116,7 +116,7 @@ class Role:
self._states.append(f"{idx}. {action}")
def _watch(self, actions: Iterable[Type[Action]]):
"""监听对应的行为"""
"""Listen to the corresponding behaviors"""
self._rc.watch.update(actions)
# check RoleContext after adding watch actions
self._rc.check(self._role_id)
@ -128,24 +128,24 @@ class Role:
self._rc.todo = self._actions[self._rc.state]
def set_env(self, env: 'Environment'):
"""设置角色工作所处的环境,角色可以向环境说话,也可以通过观察接受环境消息"""
"""Set the environment in which the role works. The role can talk to the environment and can also receive messages by observing."""
self._rc.env = env
@property
def profile(self):
"""获取角色描述(职位)"""
"""Get the role description (position)"""
return self._setting.profile
def _get_prefix(self):
"""获取角色前缀"""
"""Get the role prefix"""
if self._setting.desc:
return self._setting.desc
return PREFIX_TEMPLATE.format(**self._setting.dict())
async def _think(self) -> None:
"""思考要做什么,决定下一步的action"""
"""Think about what to do and decide on the next action"""
if len(self._actions) == 1:
# 如果只有一个动作,那就只能做这个
# If there is only one action, then only this one can be performed
self._set_state(0)
return
prompt = self._get_prefix()
@ -158,83 +158,85 @@ class Role:
next_state = "0"
self._set_state(int(next_state))
async def _act(self) -> Message:
# prompt = self.get_prefix()
# prompt += ROLE_TEMPLATE.format(name=self.profile, state=self.states[self.state], result=response,
# history=self.history)
async def _act(self) -> Message:
# prompt = self.get_prefix()
# prompt += ROLE_TEMPLATE.format(name=self.profile, state=self.states[self.state], result=response,
# history=self.history)
logger.info(f"{self._setting}: ready to {self._rc.todo}")
response = await self._rc.todo.run(self._rc.important_memory)
# logger.info(response)
if isinstance(response, ActionOutput):
msg = Message(content=response.content, instruct_content=response.instruct_content,
role=self.profile, cause_by=type(self._rc.todo))
else:
msg = Message(content=response, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
# logger.debug(f"{response}")
logger.info(f"{self._setting}: ready to {self._rc.todo}")
response = await self._rc.todo.run(self._rc.important_memory)
# logger.info(response)
if isinstance(response, ActionOutput):
msg = Message(content=response.content, instruct_content=response.instruct_content,
role=self.profile, cause_by=type(self._rc.todo))
else:
msg = Message(content=response, role=self.profile, cause_by=type(self._rc.todo))
self._rc.memory.add(msg)
# logger.debug(f"{response}")
return msg
return msg
async def _observe(self) -> int:
"""从环境中观察,获得重要信息,并加入记忆"""
if not self._rc.env:
return 0
env_msgs = self._rc.env.memory.get()
observed = self._rc.env.memory.get_by_actions(self._rc.watch)
news = self._rc.memory.remember(observed) # remember recent exact or similar memories
async def _observe(self) -> int:
"""Observe from the environment, obtain important information, and add it to memory"""
if not self._rc.env:
return 0
env_msgs = self._rc.env.memory.get()
for i in env_msgs:
self.recv(i)
observed = self._rc.env.memory.get_by_actions(self._rc.watch)
news_text = [f"{i.role}: {i.content[:20]}..." for i in news]
if news_text:
logger.debug(f'{self._setting} observed: {news_text}')
return len(news)
news = self._rc.memory.remember(observed) # remember recent exact or similar memories
def _publish_message(self, msg):
"""如果role归属于env那么role的消息会向env广播"""
if not self._rc.env:
# 如果env不存在不发布消息
return
self._rc.env.publish_message(msg)
for i in env_msgs:
self.recv(i)
async def _react(self) -> Message:
"""先想,然后再做"""
await self._think()
logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
return await self._act()
news_text = [f"{i.role}: {i.content[:20]}..." for i in news]
if news_text:
logger.debug(f'{self._setting} observed: {news_text}')
return len(news)
def recv(self, message: Message) -> None:
"""add message to history."""
# self._history += f"\n{message}"
# self._context = self._history
if message in self._rc.memory.get():
return
self._rc.memory.add(message)
def _publish_message(self, msg):
"""If the role belongs to env, then the role's messages will be broadcast to env"""
if not self._rc.env:
# If env does not exist, do not publish the message
return
self._rc.env.publish_message(msg)
async def handle(self, message: Message) -> Message:
"""接收信息,并用行动回复"""
# logger.debug(f"{self.name=}, {self.profile=}, {message.role=}")
self.recv(message)
async def _react(self) -> Message:
"""Think first, then act"""
await self._think()
logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
return await self._act()
return await self._react()
def recv(self, message: Message) -> None:
"""add message to history."""
# self._history += f"\n{message}"
# self._context = self._history
if message in self._rc.memory.get():
return
self._rc.memory.add(message)
async def run(self, message=None):
"""观察,并基于观察的结果思考、行动"""
if message:
if isinstance(message, str):
message = Message(message)
if isinstance(message, Message):
self.recv(message)
elif not await self._observe():
# 如果没有任何新信息,挂起等待
logger.debug(f"{self._setting}: no news. waiting.")
return
async def handle(self, message: Message) -> Message:
"""Receive information and reply with actions"""
# logger.debug(f"{self.name=}, {self.profile=}, {message.role=}")
self.recv(message)
rsp = await self._react()
# 将回复发布到环境,等待下一个订阅者处理
self._publish_message(rsp)
return rsp
return await self._react()
async def run(self, message=None):
"""Observe, and think and act based on the results of the observation"""
if message:
if isinstance(message, str):
message = Message(message)
if isinstance(message, Message):
self.recv(message)
if isinstance(message, list):
self.recv(Message("\n".join(message)))
elif not await self._observe():
# If there is no new information, suspend and wait
logger.debug(f"{self._setting}: no news. waiting.")
return
rsp = await self._react()
# Publish the reply to the environment, waiting for the next subscriber to process
self._publish_message(rsp)
return rsp

View file

@ -32,3 +32,4 @@ class Sales(Role):
else:
action = SearchAndSummarize()
self._init_actions([action])

View file

@ -35,3 +35,4 @@ class Searcher(Role):
async def _act(self) -> Message:
return await self._act_sp()

View file

@ -44,21 +44,21 @@ class Message:
@dataclass
class UserMessage(Message):
"""Convenient for supporting OpenAI messages"""
"""Facilitates support for OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'user')
@dataclass
class SystemMessage(Message):
"""Convenient for supporting OpenAI messages"""
"""Facilitates support for OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'system')
@dataclass
class AIMessage(Message):
"""Convenient for supporting OpenAI messages"""
"""Facilitates support for OpenAI messages"""
def __init__(self, content: str):
super().__init__(content, 'assistant')

View file

@ -59,3 +59,4 @@ class SoftwareCompany(BaseModel):
self._check_balance()
await self.environment.run()
return self.environment.history

View file

@ -9,39 +9,39 @@ from typing import Union
class GPTPromptGenerator:
"""Generates input for LLM given an output (supports instruction, chatbot, and query styles)"""
"""Using LLM, given an output, request LLM to provide input (supporting instruction, chatbot, and query styles)"""
def __init__(self):
self._generators = {i: getattr(self, f"gen_{i}_style") for i in ['instruction', 'chatbot', 'query']}
def gen_instruction_style(self, example):
"""Instruction style: given an output, request LLM for input"""
"""Instruction style: Given an output, request LLM to provide input"""
return f"""Instruction: X
Output: {example}
What kind of instruction might have produced this output?
What kind of instruction might this output come from?
X:"""
def gen_chatbot_style(self, example):
"""Chatbot style: given an output, request LLM for input"""
return f"""You are a chatbot. A user sent you an informal message, and you responded as follows.
"""Chatbot style: Given an output, request LLM to provide input"""
return f"""You are a chatbot. A user sent you an informal message, and you replied as follows.
Message: X
Response: {example}
Reply: {example}
What could the informal message X be?
X:"""
def gen_query_style(self, example):
"""Search style: given an output, request LLM for input"""
return f"""You are a search engine. Someone made a detailed query, and the following document is most relevant to that query.
"""Query style: Given an output, request LLM to provide input"""
return f"""You are a search engine. Someone made a detailed query, and the most relevant document to this query is as follows.
Query: X
Document: {example} What might the detailed query X be?
Document: {example} What is the detailed query X?
X:"""
def gen(self, example: str, style: str = 'all') -> Union[list[str], str]:
"""
Generate one or multiple outputs using the example for LLM to respond with the corresponding input.
Generate one or multiple outputs using the example, allowing LLM to reply with the corresponding input
:param example: Expected output sample from LLM
:param example: Expected LLM output sample
:param style: (all|instruction|chatbot|query)
:return: Expected input sample(s) for LLM
:return: Expected LLM input sample (one or multiple)
"""
if style != 'all':
return self._generators[style](example)

View file

@ -1,8 +1,7 @@
# -*- coding: utf-8 -*-
# @Date : 2023/7/19 16:28
# @Author : stellahong (stellahong@fuzhi.ai)
# @Description :
# @Desc :
import os
import asyncio
from os.path import join
@ -67,11 +66,11 @@ class SDEngine:
self.payload = payload
logger.info(self.sd_t2i_url)
def construct_payload(self, prompt, negative_prompt=default_negative_prompt, width=512, height=512,
def construct_payload(self, prompt, negtive_prompt=default_negative_prompt, width=512, height=512,
sd_model="galaxytimemachinesGTM_photoV20"):
# Configure the payload with provided inputs
self.payload["prompt"] = prompt
self.payload["negative_prompt"] = negative_prompt
self.payload["negtive_prompt"] = negtive_prompt
self.payload["width"] = width
self.payload["height"] = height
self.payload["override_settings"]["sd_model_checkpoint"] = sd_model
@ -92,23 +91,23 @@ class SDEngine:
self._save(results, save_name=f"output_{payload_idx}")
await session.close()
async def run(self, url, payload, session):
# Perform the HTTP POST request to the SD API
async with session.post(url, json=payload, timeout=600) as rsp:
data = await rsp.read()
rsp_json = json.loads(data)
imgs = rsp_json['images']
logger.info(f"callback rsp json is {rsp_json.keys()}")
return imgs
async def run(self, url, payload, session):
# Perform the HTTP POST request to the SD API
async with session.post(url, json=payload, timeout=600) as rsp:
data = await rsp.read()
async def run_i2i(self):
# TODO: Add a method to call the image-to-image interface
raise NotImplementedError
async def run_sam(self):
# TODO: Add a method to call the SAM interface
raise NotImplementedError
rsp_json = json.loads(data)
imgs = rsp_json['images']
logger.info(f"callback rsp json is {rsp_json.keys()}")
return imgs
async def run_i2i(self):
# todo: Add image-to-image interface call
raise NotImplementedError
async def run_sam(self):
# todo: Add SAM interface call
raise NotImplementedError
def decode_base64_to_image(img, save_name):
image = Image.open(io.BytesIO(base64.b64decode(img.split(",", 1)[0])))

View file

@ -20,8 +20,8 @@ from metagpt.tools import SearchEngineType
class SearchEngine:
"""
TODO: Integrate Google Search and perform reverse proxy
Note: Here, Google requires Proxifier or a similar global proxy
TODO: Integrate Google Search and reverse proxy.
Note: Google here requires a Proxifier or similar global proxy.
- DDG: https://pypi.org/project/duckduckgo-search/
- GOOGLE: https://programmablesearchengine.google.com/controlpanel/overview?cx=63f9de531d0e24de9
"""
@ -37,7 +37,7 @@ class SearchEngine:
logger.info(results)
return results
async def run(self, query, max_results=8):
async def run(self, query: str, max_results=8):
if self.engine == SearchEngineType.SERPAPI_GOOGLE:
api = SerpAPIWrapper()
rsp = await api.run(query)
@ -45,17 +45,13 @@ class SearchEngine:
rsp = SearchEngine.run_google(query, max_results)
elif self.engine == SearchEngineType.SERPER_GOOGLE:
api = SerperWrapper()
if isinstance(query, list):
rsp = await api.run(query)
elif isinstance(query, str):
rsp = await api.run([query])
rsp = await api.run(query)
elif self.engine == SearchEngineType.CUSTOM_ENGINE:
rsp = self.run_func(query)
else:
raise NotImplementedError
return rsp
def google_official_search(query: str, num_results: int = 8, focus=['snippet', 'link', 'title']) -> dict | list[dict]:
"""Return the results of a Google search using the official Google API
@ -74,15 +70,15 @@ def google_official_search(query: str, num_results: int = 8, focus=['snippet', '
api_key = config.google_api_key
custom_search_engine_id = config.google_cse_id
service = build("customsearch", "v1", developerKey=api_key)
with build("customsearch", "v1", developerKey=api_key) as service:
result = (
service.cse()
.list(q=query, cx=custom_search_engine_id, num=num_results)
.execute()
)
# Extract the search result items from the response
result = (
service.cse()
.list(q=query, cx=custom_search_engine_id, num=num_results)
.execute()
)
logger.info(result)
# Extract the search result items from the response
search_results = result.get("items", [])
# Create a list of only the URLs from the search results
@ -101,15 +97,13 @@ def google_official_search(query: str, num_results: int = 8, focus=['snippet', '
return "Error: The provided Google API key is invalid or missing."
else:
return f"Error: {e}"
# google_result can be a list or a string depending on the search results
# Return the list of search result URLs
return search_results_details
def safe_google_results(results: str | list) -> str:
"""
Return the results of a google search in a safe format.
Return the results of a google search in a safe format.
Args:
results (str | list): The search results.
@ -119,13 +113,12 @@ def safe_google_results(results: str | list) -> str:
"""
if isinstance(results, list):
safe_message = json.dumps(
# FIXME: # .encode("utf-8", "ignore") This was removed here, but it's present in AutoGPT, which is strange.
[result for result in results]
)
else:
safe_message = results.encode("utf-8", "ignore").decode("utf-8")
return safe_message
if __name__ == '__main__':
SearchEngine.run(query='wtf')

View file

@ -39,6 +39,6 @@ class MeilisearchEngine:
search_results = self._index.search(query)
return search_results['hits']
except Exception as e:
# Handle MeiliSearch API error
# Handle MeiliSearch API errors
print(f"MeiliSearch API error: {e}")
return []

View file

@ -113,3 +113,4 @@ class SerpAPIWrapper(BaseModel):
toret_l += [get_focused(i) for i in res.get("organic_results")]
return str(toret) + '\n' + str(toret_l)

View file

@ -38,7 +38,8 @@ class SerperWrapper(BaseModel):
async def run(self, query: str, **kwargs: Any) -> str:
"""Run query through Serper and parse result async."""
return ";".join([self._process_response(res) for res in await self.results(query)])
queries = query.split("\n")
return "\n".join([self._process_response(res) for res in await self.results(queries)])
async def results(self, queries: list[str]) -> dict:
"""Use aiohttp to run query through Serper and return the results async."""
@ -117,3 +118,4 @@ class SerperWrapper(BaseModel):
toret_l += [get_focused(i) for i in res.get("organic")]
return str(toret) + '\n' + str(toret_l)

View file

@ -7,21 +7,21 @@
"""
prompt = '''
# Instruction
Next, as a translation expert with 20 years of experience, when I provide an English sentence or paragraph, you will offer a smooth and readable translation in {LANG}. Please note the following requirements:
1. Ensure the translation is smooth and easy to understand.
2. Whether it's a statement or a question, I will only translate it.
3. Do not add content unrelated to the original text.
# 指令
接下来作为一位拥有20年翻译经验的翻译专家当我给出英文句子或段落时你将提供通顺且具有可读性的{LANG}翻译注意以下要求
1. 确保翻译结果流畅且易于理解
2. 无论提供的是陈述句或疑问句我都只进行翻译
3. 不添加与原文无关的内容
# Original Text
# 原文
{ORIGINAL}
# Translation
# 译文
'''
class Translator:
@classmethod
def translate_prompt(cls, original, lang='Chinese'):
return prompt.format(LANG=lang, ORIGINAL=original)
def translate_prompt(cls, original, lang='中文'):
return prompt.format(LANG=lang, ORIGINAL=original)

View file

@ -8,16 +8,16 @@ from metagpt.provider.openai_api import OpenAIGPTAPI as GPTAPI
ICL_SAMPLE = '''Interface definition:
```text
Interface Name: Tag Elements
Interface Name: Element Tagging
Interface Path: /projects/{project_key}/node-tags
Method: POST
Request Parameters:
Path Parameters:
Request parameters:
Path parameters:
project_key
Body Parameters:
Name Type Required Default Value Description
Body parameters:
Name Type Required Default Value Remarks
nodes array Yes Nodes
node_key string No Node key
tags array No Original node tag list
@ -26,92 +26,90 @@ operations array Yes
tags array No Operation tag list
mode string No Operation type ADD / DELETE
Return Data:
Name Type Required Default Value Description
Return data:
Name Type Required Default Value Remarks
code integer Yes Status code
msg string Yes Message
data object Yes Return data
msg string Yes Prompt message
data object Yes Returned data
list array No Node list true / false
node_type string No Node type DATASET / RECIPE
node_type string No Node type DATASET / RECIPE
node_key string No Node key
```
Unit Test:
Unit test
```python
@pytest.mark.parametrize(
"project_key, nodes, operations, expected_msg",
[
("project_key", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "success"),
("project_key", [{"node_key": "dataset_002", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["tag1"], "mode": "DELETE"}], "success"),
("", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Missing necessary parameter project_key"),
("", [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Missing the required parameter project_key"),
(123, [{"node_key": "dataset_001", "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Incorrect parameter type"),
("project_key", [{"node_key": "a"*201, "tags": ["tag1", "tag2"], "node_type": "DATASET"}], [{"tags": ["new_tag1"], "mode": "ADD"}], "Request parameter exceeds field boundary")
]
)
def test_node_tags(project_key, nodes, operations, expected_msg):
pass
```
The above is an example of interface definition and unit test.
Next, please act as an expert test manager with 20 years of experience at Google.
After I provide the interface definition, please reply with the unit test.
There are a few requirements:
1. Only output one `@pytest.mark.parametrize` and the corresponding test_<interface name> function
(with a pass inside, not implemented).
-- The function parameters should include expected_msg for result validation.
2. The generated test cases should use shorter text or numbers and be as concise as possible.
3. If comments are needed, use Chinese.
If you understand, please wait for me to provide the interface definition
and only reply with "Understood" to save tokens.
'''
# The above is an interface definition and a unit test example.
# Next, please play the role of an expert test manager with 20 years of experience at Google. When I give the interface definition,
# reply to me with a unit test. There are several requirements:
# 1. Only output one `@pytest.mark.parametrize` and the corresponding test_<interface name> function (inside pass, do not implement).
# -- The function parameter contains expected_msg for result verification.
# 2. The generated test cases use shorter text or numbers and are as compact as possible.
# 3. If comments are needed, use Chinese.
ACT_PROMPT_PREFIX = '''Reference test types: such as missing request parameters, field boundary checks, incorrect field types.
Please output 10 test cases within a `@pytest.mark.parametrize` scope.
# If you understand, please wait for me to give the interface definition and just answer "Understood" to save tokens.
ACT_PROMPT_PREFIX = '''Refer to the test types: such as missing request parameters, field boundary verification, incorrect field type.
Please output 10 test cases within one `@pytest.mark.parametrize` scope.
```text
'''
YFT_PROMPT_PREFIX = '''Reference test types: such as SQL injection, cross-site scripting (XSS), illegal access and unauthorized access, authentication and authorization, parameter verification, exception handling, file upload and download.
Please output 10 test cases within a `@pytest.mark.parametrize` scope.
YFT_PROMPT_PREFIX = '''Refer to the test types: such as SQL injection, cross-site scripting (XSS), unauthorized access and privilege escalation,
authentication and authorization, parameter verification, exception handling, file upload and download.
Please output 10 test cases within one `@pytest.mark.parametrize` scope.
```text
'''
OCR_API_DOC = '''```text
API Name: OCR Recognition
API Path: /api/v1/contract/treaty/task/ocr
Method: POST
Interface Name: OCR recognition
Interface Path: /api/v1/contract/treaty/task/ocr
Method: POST
Request Parameters:
Path Parameters:
Body Parameters:
Name Type Mandatory Default Value Remarks
Name Type Required Default Value Remarks
file_id string Yes
box array Yes
contract_id number Yes Contract ID
contract_id number Yes Contract id
start_time string No yyyy-mm-dd
end_time string No yyyy-mm-dd
extract_type number No Recognition Type 1- During Import 2- After Import, Default is 1
extract_type number No Recognition type 1- During import 2- After import Default 1
Return Data:
Name Type Mandatory Default Value Remarks
Response Data:
Name Type Required Default Value Remarks
code integer Yes
message string Yes
data object Yes
'''
class UTGenerator:
"""UT Generator: Constructs UT from API documentation."""
"""UT Generator: Construct UT through API documentation"""
def __init__(self, swagger_file: str, ut_py_path: str, questions_path: str,
chatgpt_method: str = "API", template_prefix=YFT_PROMPT_PREFIX) -> None:
"""Initializes the UT generator.
"""Initialize UT Generator
Args:
swagger_file: Path to the swagger.
ut_py_path: Path to store test cases.
questions_path: Path to store templates for further investigation.
chatgpt_method: API.
template_prefix: Use template, defaults to YFT_UT_PROMPT.
swagger_file: path to the swagger file
ut_py_path: path to store test cases
questions_path: path to store the template, facilitating subsequent checks
chatgpt_method: API method
template_prefix: use the template, default is YFT_UT_PROMPT
"""
self.swagger_file = swagger_file
self.ut_py_path = ut_py_path
@ -119,12 +117,12 @@ class UTGenerator:
assert chatgpt_method in ["API"], "Invalid chatgpt_method"
self.chatgpt_method = chatgpt_method
# ICL: In-Context Learning. Here, an example is provided for GPT to follow.
# ICL: In-Context Learning, provide an example here for GPT to mimic
self.icl_sample = ICL_SAMPLE
self.template_prefix = template_prefix
def get_swagger_json(self) -> dict:
"""Loads Swagger JSON from a local file."""
"""Load Swagger JSON from a local file"""
with open(self.swagger_file, "r", encoding="utf-8") as file:
swagger_json = json.load(file)
return swagger_json
@ -145,30 +143,30 @@ class UTGenerator:
return self.__para_to_str(prop, required, name)
def build_object_properties(self, node, prop_object_required, level: int = 0) -> str:
"""Recursively outputs properties of object and array[object] types.
"""Recursively output properties of object and array[object] types
Args:
node: Value of the sub-item.
prop_object_required: Whether it's a required item.
level: Current recursion depth.
node (_type_): value of the child item
prop_object_required (_type_): whether it's a required field
level: current recursion depth
"""
doc = ""
def dive_into_object(node):
"""If it's an object type, recursively outputs its properties."""
"""If it's an object type, recursively output its properties"""
if node.get("type") == "object":
sub_properties = node.get("properties", {})
return self.build_object_properties(sub_properties, prop_object_required, level=level + 1)
return ""
if node.get("in", "") in ["query", "header", "formData"]:
doc += f'{"\t" * level}{self._para_to_str(node)}\n'
doc += f'{" " * level}{self._para_to_str(node)}\n'
doc += dive_into_object(node)
return doc
for name, prop in node.items():
doc += f'{"\t" * level}{self.para_to_str(name, prop, prop_object_required)}\n'
doc += f'{" " * level}{self.para_to_str(name, prop, prop_object_required)}\n'
doc += dive_into_object(prop)
if prop["type"] == "array":
items = prop.get("items", {})
@ -176,10 +174,10 @@ class UTGenerator:
return doc
def get_tags_mapping(self) -> dict:
"""Handles tag and path mapping.
"""Process tag and path mappings
Returns:
Dict: Mapping of tag to path.
Dict: mapping of tag to path
"""
swagger_data = self.get_swagger_json()
paths = swagger_data["paths"]
@ -197,7 +195,7 @@ class UTGenerator:
return tags
def generate_ut(self, include_tags) -> bool:
"""Generates test case files."""
"""Generate test case files"""
tags = self.get_tags_mapping()
for tag, paths in tags.items():
if include_tags is None or tag in include_tags:
@ -207,19 +205,19 @@ class UTGenerator:
def build_api_doc(self, node: dict, path: str, method: str) -> str:
summary = node["summary"]
doc = f"Interface name: {summary}\nInterface path: {path}\nMethod: {method.upper()}\n"
doc += "\nRequest parameters:\n"
doc = f"API Name: {summary}\nAPI Path: {path}\nMethod: {method.upper()}\n"
doc += "\nRequest Parameters:\n"
if "parameters" in node:
parameters = node["parameters"]
doc += "Path parameters:\n"
doc += "Path Parameters:\n"
# param["in"]: path / formData / body / query / header
for param in parameters:
if param["in"] == "path":
doc += f'{param["name"]} \n'
doc += "\nBody parameters:\n"
doc += "Name\tType\tRequired\tDefault\tNotes\n"
doc += "\nBody Parameters:\n"
doc += "Name\tType\tRequired\tDefault Value\tRemarks\n"
for param in parameters:
if param["in"] == "body":
schema = param.get("schema", {})
@ -229,9 +227,9 @@ class UTGenerator:
else:
doc += self.build_object_properties(param, [])
# Output return data information
doc += "\nReturn data:\n"
doc += "Name\tType\tRequired\tDefault\tNotes\n"
# Display response data information
doc += "\nResponse Data:\n"
doc += "Name\tType\tRequired\tDefault Value\tRemarks\n"
responses = node["responses"]
response = responses.get("200", {})
schema = response.get("schema", {})
@ -251,7 +249,7 @@ class UTGenerator:
file.write(data)
def ask_gpt_and_save(self, question: str, tag: str, fname: str):
"""Generate a question and save both the question and answer."""
"""Generate questions and store both questions and answers"""
messages = [self.icl_sample, question]
result = self.gpt_msgs_to_code(messages=messages)
@ -259,11 +257,11 @@ class UTGenerator:
self._store(result, self.ut_py_path, tag, f"{fname}.py")
def _generate_ut(self, tag, paths):
"""Process the structure under the data path.
"""Process the structure under a data path
Args:
tag: Module name.
paths: Path Object.
tag (_type_): module name
paths (_type_): Path Object
"""
for path, path_obj in paths.items():
for method, node in path_obj.items():
@ -273,7 +271,7 @@ class UTGenerator:
self.ask_gpt_and_save(question, tag, summary)
def gpt_msgs_to_code(self, messages: list) -> str:
"""Choose based on different call methods."""
"""Choose based on different calling methods"""
result = ''
if self.chatgpt_method == "API":
result = GPTAPI().ask_code(msgs=messages)
@ -281,11 +279,11 @@ class UTGenerator:
return result
def get_file_path(self, base: Path, fname: str):
"""Save to different file paths.
"""Save different file paths
Args:
base (str): Path.
fname (str): File name.
base (str): Path
fname (str): File name
"""
path = Path(base)
path.mkdir(parents=True, exist_ok=True)

View file

@ -57,3 +57,4 @@ def get_page_content(page: str):
if __name__ == "__main__":
text = asyncio.run(WebBrowserEngine().run("https://fuzhi.ai/"))
print(text)

View file

@ -118,4 +118,4 @@ if __name__ == "__main__":
for i in ("chromium", "firefox", "webkit"):
text = asyncio.run(PlaywrightWrapper(i).run("https://httpbin.org/ip"))
print(text)
print(i)
print(i)

View file

@ -109,3 +109,4 @@ def _gen_get_driver_func(browser_type, *args, executable_path=None):
if __name__ == "__main__":
text = asyncio.run(SeleniumWrapper("chrome").run("https://fuzhi.ai/"))
print(text)

View file

@ -15,10 +15,9 @@ from metagpt.logs import logger
def check_cmd_exists(command) -> int:
"""Check if a command exists.
:param command: Command to check.
:return: Returns 0 if the command exists, otherwise returns a non-zero value.
""" Check if the command exists
:param command: Command to be checked
:return: Returns 0 if the command exists, non-zero otherwise
"""
check_command = 'command -v ' + command + ' >/dev/null 2>&1 || { echo >&2 "no mermaid"; exit 1; }'
result = os.system(check_command)
@ -29,19 +28,19 @@ class OutputParser:
@classmethod
def parse_blocks(cls, text: str):
# First, split the text into different blocks based on "##".
# First, split the text into different blocks based on "##"
blocks = text.split("##")
# Create a dictionary to store the title and content of each block.
# Create a dictionary to store the title and content of each block
block_dict = {}
# Iterate through all blocks.
# Iterate through all blocks
for block in blocks:
# If the block is not empty, continue processing.
# If the block is not empty, continue processing
if block.strip() != "":
# Split the block's title and content and trim whitespace.
# Separate the title and content of the block and trim whitespace
block_title, block_content = block.split("\n", 1)
# LLM might make mistakes; correct it here.
# LLM might have an error, correct it here
if block_title[-1] == ":":
block_title = block_title[:-1]
block_dict[block_title.strip()] = block_content.strip()
@ -85,13 +84,13 @@ class OutputParser:
block_dict = cls.parse_blocks(data)
parsed_data = {}
for block, content in block_dict.items():
# Try to remove the code marker.
# Try to remove the code marker
try:
content = cls.parse_code(text=content)
except Exception:
pass
# Try to parse the list.
# Try to parse the list
try:
content = cls.parse_file_list(text=content)
except Exception:
@ -104,7 +103,7 @@ class OutputParser:
block_dict = cls.parse_blocks(data)
parsed_data = {}
for block, content in block_dict.items():
# Try to remove the code marker.
# Try to remove the code marker
try:
content = cls.parse_code(text=content)
except Exception:
@ -115,11 +114,18 @@ class OutputParser:
else:
typing = typing_define
if typing == List[str] or typing == List[Tuple[str, str]]:
# Try to parse the list.
# Try to parse the list
try:
content = cls.parse_file_list(text=content)
except Exception:
pass
# TODO: Removing extra quotes is risky, will address later
# elif typing == str:
# # Try to remove extra quotes
# try:
# content = cls.parse_str(text=content)
# except Exception:
# pass
parsed_data[block] = content
return parsed_data
@ -136,17 +142,17 @@ class CodeParser:
@classmethod
def parse_blocks(cls, text: str):
# First, split the text into different blocks based on "##".
# First, split the text into different blocks based on "##"
blocks = text.split("##")
# Create a dictionary to store the title and content of each block.
# Create a dictionary to store the title and content of each block
block_dict = {}
# Iterate through all blocks.
# Iterate through all blocks
for block in blocks:
# If the block is not empty, continue processing.
# If the block is not empty, continue processing
if block.strip() != "":
# Split the block's title and content and trim whitespace.
# Separate the title and content of the block and trim whitespace
block_title, block_content = block.split("\n", 1)
block_dict[block_title.strip()] = block_content.strip()

View file

@ -8,7 +8,7 @@
import os
import subprocess
from pathlib import Path
from metagpt.config import CONFIG
from metagpt.const import PROJECT_ROOT
from metagpt.logs import logger
from metagpt.utils.common import check_cmd_exists
@ -39,11 +39,15 @@ def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height
# Call the `mmdc` command to convert the Mermaid code to a PNG
logger.info(f"Generating {output_file}..")
if IS_DOCKER == 'true':
subprocess.run(['mmdc', '-p', '/app/metagpt/puppeteer-config.json', '-i',
subprocess.run(['mmdc', '-p', '/app/metagpt/config/puppeteer-config.json', '-i',
str(tmp), '-o', output_file, '-w', str(width), '-H', str(height)])
else:
subprocess.run(['mmdc', '-i', str(tmp), '-o',
output_file, '-w', str(width), '-H', str(height)])
if CONFIG.puppeteer_config:
subprocess.run([CONFIG.mmdc, '-p', CONFIG.puppeteer_config, '-i', str(tmp), '-o',
output_file, '-w', str(width), '-H', str(height)])
else:
subprocess.run([CONFIG.mmdc, '-i', str(tmp), '-o',
output_file, '-w', str(width), '-H', str(height)])
return 0
@ -102,3 +106,4 @@ if __name__ == '__main__':
# logger.info(print_members(print_members))
mermaid_to_file(MMC1, PROJECT_ROOT / 'tmp/1.png')
mermaid_to_file(MMC2, PROJECT_ROOT / 'tmp/2.png')

View file

@ -9,13 +9,13 @@
import docx
def read_docx(file_path: str) -> list:
"""Open and read a docx file."""
"""Open a docx file"""
doc = docx.Document(file_path)
# Create an empty list to store paragraph contents.
# Create an empty list to store paragraph contents
paragraphs_list = []
# Iterate through the paragraphs in the document and add their content to the list.
# Iterate through the paragraphs in the document and add their content to the list
for paragraph in doc.paragraphs:
paragraphs_list.append(paragraph.text)

View file

@ -20,3 +20,4 @@ class Singleton(abc.ABCMeta, type):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]

View file

@ -34,3 +34,4 @@ def main(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool
if __name__ == '__main__':
fire.Fire(main)