Merge pull request #3 from iorisa/feature/assistant_role

feat: +general agent
This commit is contained in:
send18 2023-08-29 11:08:22 +08:00 committed by GitHub
commit 13672ad0e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
86 changed files with 3046 additions and 367 deletions

39
.devcontainer/README.md Normal file
View file

@ -0,0 +1,39 @@
# Dev container
This project includes a [dev container](https://containers.dev/), which lets you use a container as a full-featured dev environment.
You can use the dev container configuration in this folder to build and start running MetaGPT locally! For more, refer to the main README under the home directory.
You can use it in [GitHub Codespaces](https://github.com/features/codespaces) or the [VS Code Dev Containers extension](https://marketplace.visualstudio.com/items?itemName=ms-vscode-remote.remote-containers).
## GitHub Codespaces
<a href="https://codespaces.new/geekan/MetaGPT"><img src="https://github.com/codespaces/badge.svg" alt="Open in GitHub Codespaces"></a>
You may use the button above to open this repo in a Codespace
For more info, check out the [GitHub documentation](https://docs.github.com/en/free-pro-team@latest/github/developing-online-with-codespaces/creating-a-codespace#creating-a-codespace).
## VS Code Dev Containers
<a href="https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/geekan/MetaGPT"><img src="https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode" alt="Open in Dev Containers"></a>
Note: If you click this link you will open the main repo and not your local cloned repo, you can use this link and replace with your username and cloned repo name:
https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/geekan/MetaGPT
If you already have VS Code and Docker installed, you can use the button above to get started. This will cause VS Code to automatically install the Dev Containers extension if needed, clone the source code into a container volume, and spin up a dev container for use.
You can also follow these steps to open this repo in a container using the VS Code Dev Containers extension:
1. If this is your first time using a development container, please ensure your system meets the pre-reqs (i.e. have Docker installed) in the [getting started steps](https://aka.ms/vscode-remote/containers/getting-started).
2. Open a locally cloned copy of the code:
- Fork and Clone this repository to your local filesystem.
- Press <kbd>F1</kbd> and select the **Dev Containers: Open Folder in Container...** command.
- Select the cloned copy of this folder, wait for the container to start, and try things out!
You can learn more in the [Dev Containers documentation](https://code.visualstudio.com/docs/devcontainers/containers).
## Tips and tricks
* If you are working with the same repository folder in a container and Windows, you'll want consistent line endings (otherwise you may see hundreds of changes in the SCM view). The `.gitattributes` file in the root of this repo will disable line ending conversion and should prevent this. See [tips and tricks](https://code.visualstudio.com/docs/devcontainers/tips-and-tricks#_resolving-git-line-ending-issues-in-containers-resulting-in-many-modified-files) for more info.
* If you'd like to review the contents of the image used in this dev container, you can check it out in the [devcontainers/images](https://github.com/devcontainers/images/tree/main/src/python) repo.

View file

@ -0,0 +1,27 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Python 3",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:0-3.11",
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},
// Configure tool-specific properties.
"customizations": {
// Configure properties specific to VS Code.
"vscode": {
"settings": {},
"extensions": [
"streetsidesoftware.code-spell-checker"
]
}
},
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "./.devcontainer/postCreateCommand.sh"
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

View file

@ -0,0 +1,31 @@
version: '3'
services:
metagpt:
build:
dockerfile: Dockerfile
context: ..
volumes:
# Update this to wherever you want VS Code to mount the folder of your project
- ..:/workspaces:cached
networks:
- metagpt-network
# environment:
# MONGO_ROOT_USERNAME: root
# MONGO_ROOT_PASSWORD: example123
# depends_on:
# - mongo
# mongo:
# image: mongo
# restart: unless-stopped
# environment:
# MONGO_INITDB_ROOT_USERNAME: root
# MONGO_INITDB_ROOT_PASSWORD: example123
# ports:
# - "27017:27017"
# networks:
# - metagpt-network
networks:
metagpt-network:
driver: bridge

View file

@ -0,0 +1,7 @@
# Step 1: Ensure that NPM is installed on your system. Then install mermaid-js.
npm --version
sudo npm install -g @mermaid-js/mermaid-cli
# Step 2: Ensure that Python 3.9+ is installed on your system. You can check this by using:
python --version
python setup.py install

6
.gitignore vendored
View file

@ -163,3 +163,9 @@ workspace/*
*.mmd
tmp
output.wav
*.bak
# output folder
output
tmp.png

View file

@ -0,0 +1,18 @@
{
"schema_version": "v1",
"name_for_model": "text processing tools",
"name_for_human": "MetaGPT Text Plugin",
"description_for_model": "Plugins for text processing, including text-to-speech, text-to-image, text-to-embedding, text summarization, text-to-code, vector similarity calculation, web content crawling, and more.",
"description_for_human": "Plugins for text processing, including text-to-speech, text-to-image, text-to-embedding, text summarization, text-to-code, vector similarity calculation, web content crawling, and more.",
"auth": {
"type": "none"
},
"api": {
"type": "openapi",
"url": "https://github.com/iorisa/MetaGPT/blob/feature/assistant_role/.well-known/metagpt_oas3_api.yaml",
"has_user_authentication": false
},
"logo_url": "https://github.com/geekan/MetaGPT/blob/main/docs/resources/MetaGPT-logo.png",
"contact_email": "mashenquan@fuzhi.cn",
"legal_info_url": "https://github.com/geekan/MetaGPT/blob/main/docs/README_CN.md"
}

View file

@ -0,0 +1,250 @@
openapi: "3.0.0"
info:
title: "MetaGPT Export OpenAPIs"
version: "1.0"
servers:
- url: "/oas3"
variables:
port:
default: '8080'
description: HTTP service port
paths:
/tts/azsure:
x-prerequisite:
- name: AZURE_TTS_SUBSCRIPTION_KEY
description: "For more details, check out: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
- name: AZURE_TTS_REGION
description: "For more details, check out: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
post:
summary: "Convert Text to Base64-encoded .wav File Stream"
description: "For more details, check out: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
operationId: azure_tts.oas3_azsure_tts
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- text
properties:
text:
type: string
description: Text to convert
lang:
type: string
description: The language code or locale, e.g., en-US (English - United States)
default: "zh-CN"
voice:
type: string
description: "Voice style, see: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts), [Voice Gallery](https://speech.microsoft.com/portal/voicegallery)"
default: "zh-CN-XiaomoNeural"
style:
type: string
description: "Speaking style to express different emotions. For more details, checkout: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
default: "affectionate"
role:
type: string
description: "Role to specify age and gender. For more details, checkout: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
default: "Girl"
subscription_key:
type: string
description: "Key used to access Azure AI service API, see: [Azure Portal](https://portal.azure.com/) > `Resource Management` > `Keys and Endpoint`"
default: ""
region:
type: string
description: "Location (or region) of your resource, see: [Azure Portal](https://portal.azure.com/) > `Resource Management` > `Keys and Endpoint`"
default: ""
responses:
'200':
description: "Base64-encoded .wav file data if successful, otherwise an empty string."
content:
application/json:
schema:
type: object
properties:
wav_data:
type: string
format: base64
'400':
description: "Bad Request"
'500':
description: "Internal Server Error"
/txt2img/openai:
x-prerequisite:
- name: OPENAI_API_KEY
description: "OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`"
post:
summary: "Convert Text to Base64-encoded Image Data Stream"
operationId: openai_text_to_image.oas3_openai_text_to_image
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
text:
type: string
description: "The text used for image conversion."
size_type:
type: string
enum: ["256x256", "512x512", "1024x1024"]
default: "1024x1024"
description: "Size of the generated image."
openai_api_key:
type: string
default: ""
description: "OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`"
responses:
'200':
description: "Base64-encoded image data."
content:
application/json:
schema:
type: object
properties:
image_data:
type: string
format: base64
'400':
description: "Bad Request"
'500':
description: "Internal Server Error"
/txt2embedding/openai:
x-prerequisite:
- name: OPENAI_API_KEY
description: "OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`"
post:
summary: Text to embedding
operationId: openai_text_to_embedding.oas3_openai_text_to_embedding
description: Retrieve an embedding for the provided text using the OpenAI API.
requestBody:
content:
application/json:
schema:
type: object
properties:
input:
type: string
description: The text used for embedding.
model:
type: string
description: "ID of the model to use. For more details, checkout: [models](https://api.openai.com/v1/models)"
enum:
- text-embedding-ada-002
responses:
"200":
description: Successful response
content:
application/json:
schema:
$ref: "#/components/schemas/ResultEmbedding"
"4XX":
description: Client error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"5XX":
description: Server error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/txt2image/metagpt:
x-prerequisite:
- name: METAGPT_TEXT_TO_IMAGE_MODEL_URL
description: "Model url."
post:
summary: "Text to Image"
description: "Generate an image from the provided text using the MetaGPT Text-to-Image API."
operationId: metagpt_text_to_image.oas3_metagpt_text_to_image
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- text
properties:
text:
type: string
description: "The text used for image conversion."
size_type:
type: string
enum: ["512x512", "512x768"]
default: "512x512"
description: "Size of the generated image."
model_url:
type: string
description: "Model reset API URL for text-to-image."
default: ""
responses:
'200':
description: "Base64-encoded image data."
content:
application/json:
schema:
type: object
properties:
image_data:
type: string
format: base64
'400':
description: "Bad Request"
'500':
description: "Internal Server Error"
components:
schemas:
Embedding:
type: object
description: Represents an embedding vector returned by the embedding endpoint.
properties:
object:
type: string
example: embedding
embedding:
type: array
items:
type: number
example: [0.0023064255, -0.009327292, ...]
index:
type: integer
example: 0
Usage:
type: object
properties:
prompt_tokens:
type: integer
example: 8
total_tokens:
type: integer
example: 8
ResultEmbedding:
type: object
properties:
object:
type: string
example: result_embedding
data:
type: array
items:
$ref: "#/components/schemas/Embedding"
model:
type: string
example: text-embedding-ada-002
usage:
$ref: "#/components/schemas/Usage"
Error:
type: object
properties:
error:
type: string
example: An error occurred

35
.well-known/openapi.yaml Normal file
View file

@ -0,0 +1,35 @@
openapi: "3.0.0"
info:
title: Hello World
version: "1.0"
servers:
- url: /openapi
paths:
/greeting/{name}:
post:
summary: Generate greeting
description: Generates a greeting message.
operationId: hello.post_greeting
responses:
200:
description: greeting response
content:
text/plain:
schema:
type: string
example: "hello dave!"
parameters:
- name: name
in: path
description: Name of the person to greet.
required: true
schema:
type: string
example: "dave"
requestBody:
content:
application/json:
schema:
type: object

47
.well-known/skills.yaml Normal file
View file

@ -0,0 +1,47 @@
entities:
Assistant:
skills:
- name: text_to_speech
description: Text-to-speech
id: text_to_speech.text_to_speech
x-prerequisite:
- name: AZURE_TTS_SUBSCRIPTION_KEY
description: "For more details, check out: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
- name: AZURE_TTS_REGION
description: "For more details, check out: [Azure Text-to_Speech](https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts)"
arguments:
text: 'The text used for voice conversion. Required.'
lang: 'The value can contain a language code such as en (English), or a locale such as en-US (English - United States). The optional parameter are "English", "Chinese". Default value: "Chinese".'
voice: 'Default value: "zh-CN-XiaomoNeural".'
style: 'Speaking style to express different emotions like cheerfulness, empathy, and calm. The optional parameter values are "affectionate", "angry", "calm", "cheerful", "depressed", "disgruntled", "embarrassed", "envious", "fearful", "gentle", "sad", "serious". Default value: "affectionate".'
role: 'With roles, the same voice can act as a different age and gender. The optional parameter values are "Girl", "Boy", "OlderAdultFemale", "OlderAdultMale", "SeniorFemale", "SeniorMale", "YoungAdultFemale", "YoungAdultMale". Default value: "Girl".'
examples:
- ask: 'A girl says "hello world"'
answer: 'text_to_speech(text="hello world", role="Girl")'
- ask: 'A boy affectionate says "hello world"'
answer: 'text_to_speech(text="hello world", role="Boy", style="affectionate")'
- ask: 'A boy says "你好"'
answer: 'text_to_speech(text="hello world", role="Boy", lang="Chinese")'
returns:
type: string
format: base64
- name: text_to_image
description: Create a drawing based on the text.
id: text_to_image.text_to_image
x-prerequisite:
- name: OPENAI_API_KEY
description: "OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`"
- name: METAGPT_TEXT_TO_IMAGE_MODEL_URL
description: "Model url."
arguments:
text: 'The text used for image conversion. Required.'
size_type: 'Default value: "512x512".'
examples:
- ask: 'Draw a girl'
answer: 'text_to_image(text="Draw a girl", size_type="512x512")'
- ask: 'Draw an apple'
answer: 'text_to_image(text="Draw an apple", size_type="512x512")'
returns:
type: string
format: base64

View file

@ -19,6 +19,11 @@ # MetaGPT: The Multi-Agent Framework
<a href="https://twitter.com/DeepWisdom2019"><img src="https://img.shields.io/twitter/follow/MetaGPT?style=social" alt="Twitter Follow"></a>
</p>
<p align="center">
<a href="https://vscode.dev/redirect?url=vscode://ms-vscode-remote.remote-containers/cloneInVolume?url=https://github.com/geekan/MetaGPT"><img src="https://img.shields.io/static/v1?label=Dev%20Containers&message=Open&color=blue&logo=visualstudiocode" alt="Open in Dev Containers"></a>
<a href="https://codespaces.new/geekan/MetaGPT"><img src="https://img.shields.io/badge/Github_Codespace-Open-blue?logo=github" alt="Open in GitHub Codespaces"></a>
</p>
1. MetaGPT takes a **one line requirement** as input and outputs **user stories / competitive analysis / requirements / data structures / APIs / documents, etc.**
2. Internally, MetaGPT includes **product managers / architects / project managers / engineers.** It provides the entire process of a **software company along with carefully orchestrated SOPs.**
1. `Code = SOP(Team)` is the core philosophy. We materialize SOP and apply it to teams composed of LLMs.
@ -224,3 +229,9 @@ ## Contact Information
## Demo
https://github.com/geekan/MetaGPT/assets/2707039/5e8c1062-8c35-440f-bb20-2b0320f8d27d
## Join us
📢 Join Our Discord Channel!
https://discord.gg/4WdszVjv
Looking forward to seeing you there! 🎉

View file

@ -75,3 +75,6 @@ SD_T2I_API: "/sdapi/v1/txt2img"
### for Research
MODEL_FOR_RESEARCHER_SUMMARY: gpt-3.5-turbo
MODEL_FOR_RESEARCHER_REPORT: gpt-3.5-turbo-16k
### Meta Models
#METAGPT_TEXT_TO_IMAGE_MODEL: MODEL_URL

View file

@ -193,8 +193,8 @@ ## 演示
https://github.com/geekan/MetaGPT/assets/2707039/5e8c1062-8c35-440f-bb20-2b0320f8d27d
## 加入微信讨论群
## 加入我们
添加运营小姐姐,拉你入群
📢 加入我们的Discord频道https://discord.gg/4WdszVjv
<img src="resources/20230811-214014.jpg" width = "30%" height = "30%" alt="MetaGPT WeChat Discuss Group" align=center />
期待在那里与您相见!🎉

View file

@ -4,9 +4,12 @@
@Time : 2023/5/6 14:13
@Author : alexanderwu
@File : llm_hello_world.py
@Modified By: mashenquan, 2023-8-9, fix-bug: cannot find metagpt module.
"""
import asyncio
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
from metagpt.llm import LLM, Claude
from metagpt.logs import logger

View file

@ -4,10 +4,13 @@
@Time : 2023/5/7 18:32
@Author : alexanderwu
@File : search_google.py
@Modified By: mashenquan, 2023-8-9, fix-bug: cannot find metagpt module.
"""
import asyncio
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
from metagpt.roles import Searcher

View file

@ -2,9 +2,12 @@
# -*- coding: utf-8 -*-
"""
@File : search_kb.py
@Modified By: mashenquan, 2023-8-9, fix-bug: cannot find metagpt module.
"""
import asyncio
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
from metagpt.const import DATA_PATH
from metagpt.document_store import FaissStore
from metagpt.logs import logger

View file

@ -1,5 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Modified By: mashenquan, 2023-8-9, fix-bug: cannot find metagpt module.
"""
import asyncio
from pathlib import Path
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent))
from metagpt.roles import Searcher
from metagpt.tools import SearchEngineType

View file

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023-07-27
@Author : mashenquan
@File : write_teaching_plan.py
@Desc: Write teaching plan demo
"""
import asyncio
from pathlib import Path
import sys
from metagpt.config import CONFIG
sys.path.append(str(Path(__file__).resolve().parent.parent))
import aiofiles
import fire
from metagpt.logs import logger
from metagpt.actions.write_teaching_plan import TeachingPlanRequirement
from metagpt.roles.teacher import Teacher
from metagpt.software_company import SoftwareCompany
async def startup(lesson_file: str, investment: float = 3.0, n_round: int = 1, *args, **kwargs):
"""Run a startup. Be a teacher in education industry."""
demo_lesson = """
UNIT 1 Making New Friends
TOPIC 1 Welcome to China!
Section A
1a Listen and number the following names.
Jane Mari Kangkang Michael
Look, listen and understand. Then practice the conversation.
Work in groups. Introduce yourself using
I m ... Then practice 1a
with your own hometown or the following places.
1b Listen and number the following names
Jane Michael Maria Kangkang
1c Work in groups. Introduce yourself using I m ... Then practice 1a with your own hometown or the following places.
China the USA the UK Hong Kong Beijing
2a Look, listen and understand. Then practice the conversation
Hello!
Hello!
Hello!
Hello! Are you Maria?
No, Im not. Im Jane.
Oh, nice to meet you, Jane
Nice to meet you, too.
Hi, Maria!
Hi, Kangkang!
Welcome to China!
Thanks.
2b Work in groups. Make up a conversation with your own name and the
following structures.
A: Hello! / Good morning! / Hi! Im ... Are you ... ?
B: ...
3a Listen, say and trace
Aa Bb Cc Dd Ee Ff Gg
3b Listen and number the following letters. Then circle the letters with the same sound as Bb.
Aa Bb Cc Dd Ee Ff Gg
3c Match the big letters with the small ones. Then write them on the lines.
"""
CONFIG.set_context(kwargs)
lesson = ""
if lesson_file and Path(lesson_file).exists():
async with aiofiles.open(lesson_file, mode="r", encoding="utf-8") as reader:
lesson = await reader.read()
logger.info(f"Course content: {lesson}")
if not lesson:
logger.info("No course content provided, using the demo course.")
lesson = demo_lesson
company = SoftwareCompany()
company.hire([Teacher(*args, **kwargs)])
company.invest(investment)
company.start_project(lesson, cause_by=TeachingPlanRequirement, role="Teacher", **kwargs)
await company.run(n_round=1)
def main(idea: str, investment: float = 3.0, n_round: int = 5, *args, **kwargs):
"""
We are a software startup comprised of AI. By investing in us, you are empowering a future filled with limitless possibilities.
:param idea: lesson filename.
:param investment: As an investor, you have the opportunity to contribute a certain dollar amount to this AI company.
:param n_round: Reserved.
:param args: Parameters passed in format: `python your_script.py arg1 arg2 arg3`
:param kwargs: Parameters passed in format: `python your_script.py --param1=value1 --param2=value2`
:return:
"""
asyncio.run(startup(idea, investment, n_round, *args, **kwargs))
if __name__ == '__main__':
"""
Formats:
```
python write_teaching_plan.py lesson_filename --teaching_language=<the language you are teaching> --language=<your native language>
```
If `lesson_filename` is not available, a demo lesson content will be used.
"""
fire.Fire(main)

View file

@ -1,5 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/24 22:26
# @Author : alexanderwu
# @File : __init__.py
"""
@Time : 2023/4/24 22:26
@Author : alexanderwu
@File : __init__.py
@Desc : mashenquan, 2023/8/22. Add `Message` for importing by external projects.
"""
from metagpt.schema import Message
__all__ = [
"Message",
]

View file

@ -4,6 +4,7 @@
@Time : 2023/5/11 14:43
@Author : alexanderwu
@File : action.py
@Modified By: mashenquan, 2023/8/20. Add function return annotations.
"""
from abc import ABC
from typing import Optional
@ -15,6 +16,7 @@ from metagpt.llm import LLM
from metagpt.utils.common import OutputParser
from metagpt.logs import logger
class Action(ABC):
def __init__(self, name: str = '', context=None, llm: LLM = None):
self.name: str = name
@ -62,6 +64,6 @@ class Action(ABC):
instruct_content = output_class(**parsed_data)
return ActionOutput(content, instruct_content)
async def run(self, *args, **kwargs):
async def run(self, *args, **kwargs) -> str | ActionOutput | None:
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")

View file

@ -4,18 +4,19 @@
@Time : 2023/7/11 10:03
@Author : chengmaoyu
@File : action_output
@Modified By: mashenquan, 2023/8/20. Allow 'instruct_content' to be blank.
"""
from typing import Dict, Type
from typing import Dict, Type, Optional
from pydantic import BaseModel, create_model, root_validator, validator
class ActionOutput:
content: str
instruct_content: BaseModel
instruct_content: Optional[BaseModel] = None
def __init__(self, content: str, instruct_content: BaseModel):
def __init__(self, content: str, instruct_content: BaseModel=None):
self.content = content
self.instruct_content = instruct_content

View file

@ -1,53 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/6/9 22:22
@Author : Leo Xiao
@File : azure_tts.py
"""
from azure.cognitiveservices.speech import AudioConfig, SpeechConfig, SpeechSynthesizer
from metagpt.actions.action import Action
from metagpt.config import Config
class AzureTTS(Action):
def __init__(self, name, context=None, llm=None):
super().__init__(name, context, llm)
self.config = Config()
# 参数参考https://learn.microsoft.com/zh-cn/azure/cognitive-services/speech-service/language-support?tabs=tts#voice-styles-and-roles
def synthesize_speech(self, lang, voice, role, text, output_file):
subscription_key = self.config.get('AZURE_TTS_SUBSCRIPTION_KEY')
region = self.config.get('AZURE_TTS_REGION')
speech_config = SpeechConfig(
subscription=subscription_key, region=region)
speech_config.speech_synthesis_voice_name = voice
audio_config = AudioConfig(filename=output_file)
synthesizer = SpeechSynthesizer(
speech_config=speech_config,
audio_config=audio_config)
# if voice=="zh-CN-YunxiNeural":
ssml_string = f"""
<speak version='1.0' xmlns='http://www.w3.org/2001/10/synthesis' xml:lang='{lang}' xmlns:mstts='http://www.w3.org/2001/mstts'>
<voice name='{voice}'>
<mstts:express-as style='affectionate' role='{role}'>
{text}
</mstts:express-as>
</voice>
</speak>
"""
synthesizer.speak_ssml_async(ssml_string).get()
if __name__ == "__main__":
azure_tts = AzureTTS("azure_tts")
azure_tts.synthesize_speech(
"zh-CN",
"zh-CN-YunxiNeural",
"Boy",
"你好,我是卡卡",
"output.wav")

View file

@ -4,6 +4,7 @@
@Time : 2023/5/11 19:26
@Author : alexanderwu
@File : design_api.py
@Modified By: mashenquan, 2023-8-9, align `run` parameters with the parent :class:`Action` class.
"""
import shutil
from pathlib import Path
@ -135,7 +136,7 @@ class WriteDesign(Action):
self._save_prd(docs_path, resources_path, context[-1].content)
self._save_system_design(docs_path, resources_path, content)
async def run(self, context):
async def run(self, context, **kwargs):
prompt = PROMPT_TEMPLATE.format(context=context, format_example=FORMAT_EXAMPLE)
# system_design = await self._aask(prompt)
system_design = await self._aask_v1(prompt, "system_design", OUTPUT_MAPPING)

View file

@ -4,6 +4,7 @@
@Time : 2023/5/11 19:12
@Author : alexanderwu
@File : project_management.py
@Modified By: mashenquan, 2023-8-9, align `run` parameters with the parent :class:`Action` class.
"""
from typing import List, Tuple
@ -115,7 +116,7 @@ class WriteTasks(Action):
requirements_path = WORKSPACE_ROOT / ws_name / 'requirements.txt'
requirements_path.write_text(rsp.instruct_content.dict().get("Required Python third-party packages").strip('"\n'))
async def run(self, context):
async def run(self, context, **kwargs):
prompt = PROMPT_TEMPLATE.format(context=context, format_example=FORMAT_EXAMPLE)
rsp = await self._aask_v1(prompt, "task", OUTPUT_MAPPING)
self._save(context, rsp)

View file

@ -4,11 +4,12 @@
@Time : 2023/5/23 17:26
@Author : alexanderwu
@File : search_google.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pydantic
from metagpt.actions import Action
from metagpt.config import Config
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.tools.search_engine import SearchEngine
@ -102,8 +103,7 @@ You are a member of a professional butler team and will provide helpful suggesti
class SearchAndSummarize(Action):
def __init__(self, name="", context=None, llm=None, engine=None, search_func=None):
self.config = Config()
self.engine = engine or self.config.search_engine
self.engine = engine or CONFIG.search_engine
try:
self.search_engine = SearchEngine(self.engine, run_func=search_func)

View file

@ -0,0 +1,102 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/28
@Author : mashenquan
@File : skill_action.py
@Desc : Call learned skill
"""
import ast
import importlib
import traceback
from metagpt.actions import Action, ActionOutput
from metagpt.learn.skill_loader import Skill
from metagpt.logs import logger
class ArgumentsParingAction(Action):
def __init__(self, last_talk: str, skill: Skill, context=None, llm=None, **kwargs):
super(ArgumentsParingAction, self).__init__(name='', context=context, llm=llm)
self.skill = skill
self.ask = last_talk
self.rsp = None
self.args = None
@property
def prompt(self):
prompt = f"{self.skill.name} function parameters description:\n"
for k, v in self.skill.arguments.items():
prompt += f"parameter `{k}`: {v}\n"
prompt += "\n"
prompt += "Examples:\n"
for e in self.skill.examples:
prompt += f"If want you to do `{e.ask}`, return `{e.answer}` brief and clear.\n"
prompt += f"\nNow I want you to do `{self.ask}`, return in examples format above, brief and clear."
return prompt
async def run(self, *args, **kwargs) -> ActionOutput:
prompt = self.prompt
logger.info(prompt)
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
logger.info(rsp)
self.args = ArgumentsParingAction.parse_arguments(skill_name=self.skill.name, txt=rsp)
self.rsp = ActionOutput(content=rsp)
return self.rsp
@staticmethod
def parse_arguments(skill_name, txt) -> dict:
prefix = skill_name + "("
if prefix not in txt:
logger.error(f"{skill_name} not in {txt}")
return None
if ")" not in txt:
logger.error(f"')' not in {txt}")
return None
begin_ix = txt.find(prefix)
end_ix = txt.rfind(")")
args_txt = txt[begin_ix + len(prefix): end_ix]
logger.info(args_txt)
fake_expression = f"dict({args_txt})"
parsed_expression = ast.parse(fake_expression, mode='eval')
args = {}
for keyword in parsed_expression.body.keywords:
key = keyword.arg
value = ast.literal_eval(keyword.value)
args[key] = value
return args
class SkillAction(Action):
def __init__(self, skill: Skill, args: dict, context=None, llm=None, **kwargs):
super(SkillAction, self).__init__(name='', context=context, llm=llm)
self._skill = skill
self._args = args
self.rsp = None
async def run(self, *args, **kwargs) -> str | ActionOutput | None:
"""Run action"""
try:
self.rsp = await self.find_and_call_function(self._skill.name, args=self._args, **kwargs)
except Exception as e:
logger.exception(f"{e}, traceback:{traceback.format_exc()}")
self.rsp = f"Error: {e}"
return ActionOutput(content=self.rsp, instruct_content=self._skill.json())
@staticmethod
async def find_and_call_function(function_name, args, **kwargs):
try:
module = importlib.import_module("metagpt.learn")
function = getattr(module, function_name)
# 调用函数并返回结果
result = await function(**args, **kwargs)
return result
except (ModuleNotFoundError, AttributeError):
logger.error(f"{function_name} not found")
return None
if __name__ == '__main__':
ArgumentsParingAction.parse_arguments(skill_name="text_to_image",
txt='`text_to_image(text="Draw an apple", size_type="512x512")`')

View file

@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/28
@Author : mashenquan
@File : talk_action.py
@Desc : Act as its a talk
"""
from metagpt.actions import Action, ActionOutput
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE
from metagpt.logs import logger
class TalkAction(Action):
def __init__(self, name: str = '', talk='', history_summary='', knowledge='', context=None, llm=None, **kwargs):
context = context or {}
context["talk"] = talk
context["history_summery"] = history_summary
context["knowledge"] = knowledge
super(TalkAction, self).__init__(name=name, context=context, llm=llm)
self._talk = talk
self._history_summary = history_summary
self._knowledge = knowledge
self._rsp = None
@property
def prompt(self):
prompt = f"Background knowledge:\n{self._knowledge}\n\n" if self._knowledge else ""
prompt += f"{self._history_summary}\n\n"
if self._history_summary != "":
prompt += "According to the historical conversation above, "
language = CONFIG.language or DEFAULT_LANGUAGE
prompt += f"Answer in {language}:\n {self._talk}"
return prompt
async def run(self, *args, **kwargs) -> ActionOutput:
prompt = self.prompt
logger.info(prompt)
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
logger.info(rsp)
self._rsp = ActionOutput(content=rsp)
return self._rsp

View file

@ -0,0 +1,159 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/27
@Author : mashenquan
@File : write_teaching_plan.py
"""
from metagpt.logs import logger
from metagpt.actions import Action
from metagpt.schema import Message
class TeachingPlanRequirement(Action):
"""Teaching Plan Requirement without any implementation details"""
async def run(self, *args, **kwargs):
raise NotImplementedError
class WriteTeachingPlanPart(Action):
"""Write Teaching Plan Part"""
def __init__(self, name: str = "", context=None, llm=None, topic: str = "", language: str = "Chinese"):
"""
:param name: action name
:param context: context
:param llm: object of :class:`LLM`
:param topic: topic part of teaching plan
:param language: A human language, such as Chinese, English, French, etc.
"""
super().__init__(name, context, llm)
self.topic = topic
self.language = language
self.rsp = None
async def run(self, messages, *args, **kwargs):
if len(messages) < 1 or not isinstance(messages[0], Message):
raise ValueError("Invalid args, a tuple of List[Message] is expected")
statement_patterns = self.TOPIC_STATEMENTS.get(self.topic, [])
statements = []
from metagpt.roles import Role
for p in statement_patterns:
s = Role.format_value(p)
statements.append(s)
formatter = self.PROMPT_TITLE_TEMPLATE if self.topic == self.COURSE_TITLE else self.PROMPT_TEMPLATE
prompt = formatter.format(formation=self.FORMATION,
role=self.prefix,
statements="\n".join(statements),
lesson=messages[0].content,
topic=self.topic,
language=self.language)
logger.debug(prompt)
rsp = await self._aask(prompt=prompt)
logger.debug(rsp)
self._set_result(rsp)
return self.rsp
def _set_result(self, rsp):
if self.DATA_BEGIN_TAG in rsp:
ix = rsp.index(self.DATA_BEGIN_TAG)
rsp = rsp[ix + len(self.DATA_BEGIN_TAG):]
if self.DATA_END_TAG in rsp:
ix = rsp.index(self.DATA_END_TAG)
rsp = rsp[0:ix]
self.rsp = rsp.strip()
if self.topic != self.COURSE_TITLE:
return
if '#' not in self.rsp or self.rsp.index('#') != 0:
self.rsp = "# " + self.rsp
def __str__(self):
"""Return `topic` value when str()"""
return self.topic
def __repr__(self):
"""Show `topic` value when debug"""
return self.topic
FORMATION = "\"Capacity and role\" defines the role you are currently playing;\n" \
"\t\"[LESSON_BEGIN]\" and \"[LESSON_END]\" tags enclose the content of textbook;\n" \
"\t\"Statement\" defines the work detail you need to complete at this stage;\n" \
"\t\"Answer options\" defines the format requirements for your responses;\n" \
"\t\"Constraint\" defines the conditions that your responses must comply with."
COURSE_TITLE = "Title"
TOPICS = [
COURSE_TITLE, "Teaching Hours", "Teaching Objectives", "Teaching Content",
"Teaching Methods and Strategies", "Learning Activities",
"Teaching Time Allocation", "Assessment and Feedback", "Teaching Summary and Improvement",
"Vocabulary Cloze", "Choice Questions", "Grammar Questions", "Translation Questions"
]
TOPIC_STATEMENTS = {
COURSE_TITLE: ["Statement: Find and return the title of the lesson only in markdown first-level header format, "
"without anything else."],
"Teaching Content": [
"Statement: \"Teaching Content\" must include vocabulary, analysis, and examples of various grammar "
"structures that appear in the textbook, as well as the listening materials and key points.",
"Statement: \"Teaching Content\" must include more examples."],
"Teaching Time Allocation": [
"Statement: \"Teaching Time Allocation\" must include how much time is allocated to each "
"part of the textbook content."],
"Teaching Methods and Strategies": [
"Statement: \"Teaching Methods and Strategies\" must include teaching focus, difficulties, materials, "
"procedures, in detail."
],
"Vocabulary Cloze": [
"Statement: Based on the content of the textbook enclosed by \"[LESSON_BEGIN]\" and \"[LESSON_END]\", "
"create vocabulary cloze. The cloze should include 10 {language} questions with {teaching_language} "
"answers, and it should also include 10 {teaching_language} questions with {language} answers. "
"The key-related vocabulary and phrases in the textbook content must all be included in the exercises.",
],
"Grammar Questions": [
"Statement: Based on the content of the textbook enclosed by \"[LESSON_BEGIN]\" and \"[LESSON_END]\", "
"create grammar questions. 10 questions."],
"Choice Questions": [
"Statement: Based on the content of the textbook enclosed by \"[LESSON_BEGIN]\" and \"[LESSON_END]\", "
"create choice questions. 10 questions."],
"Translation Questions": [
"Statement: Based on the content of the textbook enclosed by \"[LESSON_BEGIN]\" and \"[LESSON_END]\", "
"create translation questions. The translation should include 10 {language} questions with "
"{teaching_language} answers, and it should also include 10 {teaching_language} questions with "
"{language} answers."
]
}
# Teaching plan title
PROMPT_TITLE_TEMPLATE = "Do not refer to the context of the previous conversation records, " \
"start the conversation anew.\n\n" \
"Formation: {formation}\n\n" \
"{statements}\n" \
"Constraint: Writing in {language}.\n" \
"Answer options: Encloses the lesson title with \"[TEACHING_PLAN_BEGIN]\" " \
"and \"[TEACHING_PLAN_END]\" tags.\n" \
"[LESSON_BEGIN]\n" \
"{lesson}\n" \
"[LESSON_END]"
# Teaching plan parts:
PROMPT_TEMPLATE = "Do not refer to the context of the previous conversation records, " \
"start the conversation anew.\n\n" \
"Formation: {formation}\n\n" \
"Capacity and role: {role}\n" \
"Statement: Write the \"{topic}\" part of teaching plan, " \
"WITHOUT ANY content unrelated to \"{topic}\"!!\n" \
"{statements}\n" \
"Answer options: Enclose the teaching plan content with \"[TEACHING_PLAN_BEGIN]\" " \
"and \"[TEACHING_PLAN_END]\" tags.\n" \
"Answer options: Using proper markdown format from second-level header format.\n" \
"Constraint: Writing in {language}.\n" \
"[LESSON_BEGIN]\n" \
"{lesson}\n" \
"[LESSON_END]"
DATA_BEGIN_TAG = "[TEACHING_PLAN_BEGIN]"
DATA_END_TAG = "[TEACHING_PLAN_END]"

View file

@ -1,16 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
提供配置单例
Provide configuration, singleton.
@Modified BY: mashenquan, 2023/8/28. Replace the global variable `CONFIG` with `ContextVar`.
"""
import json
import os
from copy import deepcopy
from typing import Any
import openai
import yaml
from metagpt.const import PROJECT_ROOT
from metagpt.const import PROJECT_ROOT, OPTIONS
from metagpt.logs import logger
from metagpt.tools import SearchEngineType, WebBrowserEngineType
from metagpt.utils.cost_manager import CostManager
from metagpt.utils.singleton import Singleton
@ -28,7 +33,7 @@ class NotConfiguredException(Exception):
class Config(metaclass=Singleton):
"""
常规使用方法
Usual Usage:
config = Config("config.yaml")
secret_key = config.get_key("MY_SECRET_KEY")
print("Secret key:", secret_key)
@ -39,16 +44,20 @@ class Config(metaclass=Singleton):
default_yaml_file = PROJECT_ROOT / "config/config.yaml"
def __init__(self, yaml_file=default_yaml_file):
self._configs = {}
self._init_with_config_files_and_env(self._configs, yaml_file)
self._init_with_config_files_and_env(yaml_file)
self.cost_manager = CostManager(**json.loads(self.COST_MANAGER)) if self.COST_MANAGER else CostManager()
logger.info("Config loading done.")
self._update()
def _update(self):
self.global_proxy = self._get("GLOBAL_PROXY")
self.openai_api_key = self._get("OPENAI_API_KEY")
self.anthropic_api_key = self._get("Anthropic_API_KEY")
if (not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key) and (
not self.anthropic_api_key or "YOUR_API_KEY" == self.anthropic_api_key
not self.anthropic_api_key or "YOUR_API_KEY" == self.anthropic_api_key
):
raise NotConfiguredException("Set OPENAI_API_KEY or Anthropic_API_KEY first")
logger.warning("Set OPENAI_API_KEY or Anthropic_API_KEY first")
self.openai_api_base = self._get("OPENAI_API_BASE")
if not self.openai_api_base or "YOUR_API_BASE" == self.openai_api_base:
openai_proxy = self._get("OPENAI_PROXY") or self.global_proxy
@ -76,8 +85,7 @@ class Config(metaclass=Singleton):
self.long_term_memory = self._get("LONG_TERM_MEMORY", False)
if self.long_term_memory:
logger.warning("LONG_TERM_MEMORY is True")
self.max_budget = self._get("MAX_BUDGET", 10.0)
self.total_cost = 0.0
self.cost_manager.max_budget = self._get("MAX_BUDGET", 10.0)
self.puppeteer_config = self._get("PUPPETEER_CONFIG", "")
self.mmdc = self._get("MMDC", "mmdc")
@ -85,9 +93,9 @@ class Config(metaclass=Singleton):
self.model_for_researcher_summary = self._get("MODEL_FOR_RESEARCHER_SUMMARY")
self.model_for_researcher_report = self._get("MODEL_FOR_RESEARCHER_REPORT")
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
def _init_with_config_files_and_env(self, yaml_file):
"""从config/key.yaml / config/config.yaml / env三处按优先级递减加载"""
configs.update(os.environ)
configs = dict(os.environ)
for _yaml_file in [yaml_file, self.key_yaml_file]:
if not _yaml_file.exists():
@ -98,18 +106,40 @@ class Config(metaclass=Singleton):
yaml_data = yaml.safe_load(file)
if not yaml_data:
continue
os.environ.update({k: v for k, v in yaml_data.items() if isinstance(v, str)})
configs.update(yaml_data)
OPTIONS.set(configs)
def _get(self, *args, **kwargs):
return self._configs.get(*args, **kwargs)
@staticmethod
def _get(*args, **kwargs):
m = OPTIONS.get()
return m.get(*args, **kwargs)
def get(self, key, *args, **kwargs):
"""从config/key.yaml / config/config.yaml / env三处找值找不到报错"""
"""Retrieve values from config/key.yaml, config/config.yaml, and environment variables.
Throw an error if not found."""
value = self._get(key, *args, **kwargs)
if value is None:
raise ValueError(f"Key '{key}' not found in environment variables or in the YAML file")
return value
def __setattr__(self, name: str, value: Any) -> None:
OPTIONS.get()[name] = value
def __getattr__(self, name: str) -> Any:
m = OPTIONS.get()
return m.get(name)
def set_context(self, options: dict):
"""Update current config"""
opts = deepcopy(OPTIONS.get())
opts.update(options)
OPTIONS.set(opts)
self._update()
@property
def options(self):
"""Return all key-values"""
return OPTIONS.get()
CONFIG = Config()

View file

@ -3,8 +3,10 @@
"""
@Time : 2023/5/1 11:59
@Author : alexanderwu
@File : const.py
@File : const.py'
@Modified By: mashenquan, 2023/8/28. Add 'OPTIONS', 'DEFAULT_LANGUAGE', 'DEFAULT_MAX_TOKENS'...
"""
import contextvars
from pathlib import Path
@ -35,3 +37,10 @@ TMP = PROJECT_ROOT / 'tmp'
RESEARCH_PATH = DATA_PATH / "research"
MEM_TTL = 24 * 30 * 3600
OPTIONS = contextvars.ContextVar("OPTIONS")
DEFAULT_LANGUAGE = "English"
DEFAULT_MAX_TOKENS = 1500
COMMAND_TOKENS = 500
BRAIN_MEMORY = "BRAIN_MEMORY"
SKILL_PATH = "SKILL_PATH"

View file

@ -4,6 +4,7 @@
@Time : 2023/5/25 10:20
@Author : alexanderwu
@File : faiss_store.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pickle
from pathlib import Path
@ -36,8 +37,11 @@ class FaissStore(LocalStore):
store.index = index
return store
def _write(self, docs, metadatas):
store = FAISS.from_texts(docs, OpenAIEmbeddings(openai_api_version="2020-11-07"), metadatas=metadatas)
def _write(self, docs, metadatas, **kwargs):
store = FAISS.from_texts(docs,
OpenAIEmbeddings(openai_api_version="2020-11-07",
openai_api_key=kwargs.get("OPENAI_API_KEY")),
metadatas=metadatas)
return store
def persist(self):

View file

@ -5,3 +5,11 @@
@Author : alexanderwu
@File : __init__.py
"""
from metagpt.learn.text_to_image import text_to_image
from metagpt.learn.text_to_speech import text_to_speech
__all__ = [
"text_to_image",
"text_to_speech",
]

View file

@ -0,0 +1,84 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : skill_loader.py
@Desc : Skill YAML Configuration Loader.
"""
from pathlib import Path
from typing import List, Dict, Optional
import yaml
from pydantic import BaseModel, Field
class Example(BaseModel):
ask: str
answer: str
class Returns(BaseModel):
type: str
format: Optional[str] = None
class Prerequisite(BaseModel):
name: str
type: Optional[str] = None
description: Optional[str] = None
default: Optional[str] = None
class Skill(BaseModel):
name: str
description: str
id: str
x_prerequisite: Optional[List[Prerequisite]] = Field(default=None, alias="x-prerequisite")
arguments: Dict
examples: List[Example]
returns: Returns
class EntitySkills(BaseModel):
skills: List[Skill]
class SkillsDeclaration(BaseModel):
entities: Dict[str, EntitySkills]
class SkillLoader:
def __init__(self, skill_yaml_file_name: Path = None):
if not skill_yaml_file_name:
skill_yaml_file_name = Path(__file__).parent.parent.parent / ".well-known/skills.yaml"
with open(str(skill_yaml_file_name), 'r') as file:
skills = yaml.safe_load(file)
self._skills = SkillsDeclaration(**skills)
def get_skill_list(self, entity_name: str = "Assistant") -> Dict:
"""Return the skill name based on the skill description."""
entity_skills = self.get_entity(entity_name)
if not entity_skills:
return {}
description_to_name_mappings = {}
for s in entity_skills.skills:
description_to_name_mappings[s.description] = s.name
return description_to_name_mappings
def get_skill(self, name, entity_name: str = "Assistant") -> Skill:
"""Return a skill by name."""
entity = self.get_entity(entity_name)
if not entity:
return None
for sk in entity.skills:
if sk.name == name:
return sk
def get_entity(self, name) -> EntitySkills:
"""Return a list of skills for the entity."""
if not self._skills:
return None
return self._skills.entities.get(name)

View file

@ -0,0 +1,24 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : text_to_embedding.py
@Desc : Text-to-Embedding skill, which provides text-to-embedding functionality.
"""
from metagpt.config import CONFIG
from metagpt.tools.openai_text_to_embedding import oas3_openai_text_to_embedding
async def text_to_embedding(text, model="text-embedding-ada-002", openai_api_key="", **kwargs):
"""Text to embedding
:param text: The text used for embedding.
:param model: One of ['text-embedding-ada-002'], ID of the model to use. For more details, checkout: `https://api.openai.com/v1/models`.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
if CONFIG.OPENAI_API_KEY or openai_api_key:
return await oas3_openai_text_to_embedding(text, model=model, openai_api_key=openai_api_key)
raise EnvironmentError

View file

@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : text_to_image.py
@Desc : Text-to-Image skill, which provides text-to-image functionality.
"""
from metagpt.config import CONFIG
from metagpt.tools.metagpt_text_to_image import oas3_metagpt_text_to_image
from metagpt.tools.openai_text_to_image import oas3_openai_text_to_image
async def text_to_image(text, size_type: str = "512x512", openai_api_key="", model_url="", **kwargs):
"""Text to image
:param text: The text used for image conversion.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param size_type: If using OPENAI, the available size options are ['256x256', '512x512', '1024x1024'], while for MetaGPT, the options are ['512x512', '512x768'].
:param model_url: MetaGPT model url
:return: The image data is returned in Base64 encoding.
"""
image_declaration = "data:image/png;base64,"
if CONFIG.METAGPT_TEXT_TO_IMAGE_MODEL_URL or model_url:
data = await oas3_metagpt_text_to_image(text, size_type, model_url)
return image_declaration + data if data else ""
if CONFIG.OPENAI_API_KEY or openai_api_key:
data = await oas3_openai_text_to_image(text, size_type, openai_api_key)
return image_declaration + data if data else ""
raise EnvironmentError

View file

@ -0,0 +1,36 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/17
@Author : mashenquan
@File : text_to_speech.py
@Desc : Text-to-Speech skill, which provides text-to-speech functionality
"""
from metagpt.config import CONFIG
from metagpt.tools.azure_tts import oas3_azsure_tts
async def text_to_speech(text, lang="zh-CN", voice="zh-CN-XiaomoNeural", style="affectionate", role="Girl",
subscription_key="", region="", **kwargs):
"""Text to speech
For more details, check out:`https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param lang: The value can contain a language code such as en (English), or a locale such as en-US (English - United States). For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param voice: For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`, `https://speech.microsoft.com/portal/voicegallery`
:param style: Speaking style to express different emotions like cheerfulness, empathy, and calm. For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param role: With roles, the same voice can act as a different age and gender. For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param text: The text used for voice conversion.
:param subscription_key: key is used to access your Azure AI service API, see: `https://portal.azure.com/` > `Resource Management` > `Keys and Endpoint`
:param region: This is the location (or region) of your resource. You may need to use this field when making calls to this API.
:return: Returns the Base64-encoded .wav file data if successful, otherwise an empty string.
"""
audio_declaration = "data:audio/wav;base64,"
if (CONFIG.AZURE_TTS_SUBSCRIPTION_KEY and CONFIG.AZURE_TTS_REGION) or \
(subscription_key and region):
data = await oas3_azsure_tts(text, lang, voice, style, role, subscription_key, region)
return audio_declaration + data if data else data
raise EnvironmentError

View file

@ -4,11 +4,11 @@
@Time : 2023/6/5 01:44
@Author : alexanderwu
@File : skill_manager.py
@Modified By: mashenquan, 2023/8/20. Remove useless `_llm`
"""
from metagpt.actions import Action
from metagpt.const import PROMPT_PATH
from metagpt.document_store.chromadb_store import ChromaStore
from metagpt.llm import LLM
from metagpt.logs import logger
Skill = Action
@ -18,7 +18,6 @@ class SkillManager:
"""用来管理所有技能"""
def __init__(self):
self._llm = LLM()
self._store = ChromaStore('skill_manager')
self._skills: dict[str: Skill] = {}

View file

@ -0,0 +1,69 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : brain_memory.py
@Desc : Support memory for multiple tasks and multiple mainlines.
"""
from enum import Enum
from typing import List, Dict
import pydantic
from metagpt import Message
class MessageType(Enum):
Talk = "TALK"
Solution = "SOLUTION"
Problem = "PROBLEM"
Skill = "SKILL"
Answer = "ANSWER"
class BrainMemory(pydantic.BaseModel):
history: List[Dict] = []
stack: List[Dict] = []
solution: List[Dict] = []
knowledge: List[Dict] = []
def add_talk(self, msg: Message):
msg.add_tag(MessageType.Talk.value)
self.history.append(msg.dict())
def add_answer(self, msg: Message):
msg.add_tag(MessageType.Answer.value)
self.history.append(msg.dict())
def get_knowledge(self) -> str:
texts = [Message(**m).content for m in self.knowledge]
return "\n".join(texts)
@property
def history_text(self):
if len(self.history) == 0:
return ""
texts = [Message(**m).content for m in self.history[:-1]]
return "\n".join(texts)
def move_to_solution(self):
if len(self.history) < 2:
return
msgs = self.history[:-1]
self.solution.extend(msgs)
if not Message(**self.history[-1]).is_contain(MessageType.Talk.value):
self.solution.append(self.history[-1])
self.history = []
else:
self.history = self.history[-1:]
@property
def last_talk(self):
if len(self.history) == 0:
return None
last_msg = Message(**self.history[-1])
if not last_msg.is_contain(MessageType.Talk.value):
return None
return last_msg.content

View file

@ -1,6 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the implement of Long-term memory
"""
@Desc : the implement of Long-term memory
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from metagpt.logs import logger
from metagpt.memory import Memory
@ -34,13 +37,13 @@ class LongTermMemory(Memory):
self.add_batch(messages)
self.msg_from_recover = False
def add(self, message: Message):
def add(self, message: Message, **kwargs):
super(LongTermMemory, self).add(message)
for action in self.rc.watch:
if message.cause_by == action and not self.msg_from_recover:
# currently, only add role's watching messages to its memory_storage
# and ignore adding messages from recover repeatedly
self.memory_storage.add(message)
self.memory_storage.add(message, **kwargs)
def remember(self, observed: list[Message], k=0) -> list[Message]:
"""

View file

@ -1,6 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the implement of memory storage
"""
@Desc : the implement of memory storage
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from typing import List
from pathlib import Path
@ -61,13 +64,13 @@ class MemoryStorage(FaissStore):
super(MemoryStorage, self).persist()
logger.debug(f'Agent {self.role_id} persist memory into local')
def add(self, message: Message) -> bool:
def add(self, message: Message, **kwargs) -> bool:
""" add message into memory storage"""
docs = [message.content]
metadatas = [{"message_ser": serialize_message(message)}]
if not self.store:
# init Faiss
self.store = self._write(docs, metadatas)
self.store = self._write(docs, metadatas, **kwargs)
self._initialized = True
else:
self.store.add_texts(texts=docs, metadatas=metadatas)

View file

@ -4,6 +4,7 @@
@Time : 2023/5/5 23:04
@Author : alexanderwu
@File : base_gpt_api.py
@Desc : mashenquan, 2023/8/22. + try catch
"""
from abc import abstractmethod
from typing import Optional
@ -41,7 +42,11 @@ class BaseGPTAPI(BaseChatbot):
message = self._system_msgs(system_msgs) + [self._user_msg(msg)]
else:
message = [self._default_system_msg(), self._user_msg(msg)]
rsp = await self.acompletion_text(message, stream=True)
try:
rsp = await self.acompletion_text(message, stream=True)
except Exception as e:
logger.exception(f"{e}")
raise e
logger.debug(message)
# logger.debug(rsp)
return rsp

View file

@ -3,21 +3,26 @@
@Time : 2023/5/5 23:08
@Author : alexanderwu
@File : openai.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation;
Change cost control from global to company level.
"""
import asyncio
import re
import time
from typing import NamedTuple
import random
from typing import List
import traceback
import openai
from openai.error import APIConnectionError
from tenacity import retry, stop_after_attempt, after_log, wait_fixed, retry_if_exception_type
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE, DEFAULT_MAX_TOKENS
from metagpt.logs import logger
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.utils.singleton import Singleton
from metagpt.utils.cost_manager import Costs
from metagpt.utils.token_counter import (
TOKEN_COSTS,
count_message_tokens,
count_string_tokens,
get_max_completion_tokens,
@ -35,7 +40,7 @@ class RateLimiter:
self.rpm = rpm
def split_batches(self, batch):
return [batch[i : i + self.rpm] for i in range(0, len(batch), self.rpm)]
return [batch[i: i + self.rpm] for i in range(0, len(batch), self.rpm)]
async def wait_if_needed(self, num_requests):
current_time = time.time()
@ -49,73 +54,6 @@ class RateLimiter:
self.last_call_time = time.time()
class Costs(NamedTuple):
total_prompt_tokens: int
total_completion_tokens: int
total_cost: float
total_budget: float
class CostManager(metaclass=Singleton):
"""计算使用接口的开销"""
def __init__(self):
self.total_prompt_tokens = 0
self.total_completion_tokens = 0
self.total_cost = 0
self.total_budget = 0
def update_cost(self, prompt_tokens, completion_tokens, model):
"""
Update the total cost, prompt tokens, and completion tokens.
Args:
prompt_tokens (int): The number of tokens used in the prompt.
completion_tokens (int): The number of tokens used in the completion.
model (str): The model used for the API call.
"""
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
cost = (prompt_tokens * TOKEN_COSTS[model]["prompt"] + completion_tokens * TOKEN_COSTS[model]["completion"]) / 1000
self.total_cost += cost
logger.info(
f"Total running cost: ${self.total_cost:.3f} | Max budget: ${CONFIG.max_budget:.3f} | "
f"Current cost: ${cost:.3f}, prompt_tokens: {prompt_tokens}, completion_tokens: {completion_tokens}"
)
CONFIG.total_cost = self.total_cost
def get_total_prompt_tokens(self):
"""
Get the total number of prompt tokens.
Returns:
int: The total number of prompt tokens.
"""
return self.total_prompt_tokens
def get_total_completion_tokens(self):
"""
Get the total number of completion tokens.
Returns:
int: The total number of completion tokens.
"""
return self.total_completion_tokens
def get_total_cost(self):
"""
Get the total cost of API calls.
Returns:
float: The total cost of API calls.
"""
return self.total_cost
def get_costs(self) -> Costs:
"""获得所有开销"""
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)
def log_and_reraise(retry_state):
logger.error(f"Retry attempts exhausted. Last exception: {retry_state.outcome.exception()}")
logger.warning("""
@ -135,7 +73,6 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
self.llm = openai
self.model = CONFIG.openai_api_model
self.auto_max_tokens = False
self._cost_manager = CostManager()
RateLimiter.__init__(self, rpm=self.rpm)
def __init_openai(self, config):
@ -148,8 +85,10 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
self.rpm = int(config.get("RPM", 10))
async def _achat_completion_stream(self, messages: list[dict]) -> str:
response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True)
response = await self.async_retry_call(openai.ChatCompletion.acreate,
**self._cons_kwargs(messages),
stream=True
)
# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
@ -190,12 +129,12 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return kwargs
async def _achat_completion(self, messages: list[dict]) -> dict:
rsp = await self.llm.ChatCompletion.acreate(**self._cons_kwargs(messages))
rsp = await self.async_retry_call(self.llm.ChatCompletion.acreate, **self._cons_kwargs(messages))
self._update_costs(rsp.get("usage"))
return rsp
def _chat_completion(self, messages: list[dict]) -> dict:
rsp = self.llm.ChatCompletion.create(**self._cons_kwargs(messages))
rsp = self.retry_call(self.llm.ChatCompletion.create, **self._cons_kwargs(messages))
self._update_costs(rsp)
return rsp
@ -268,14 +207,132 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
try:
prompt_tokens = int(usage['prompt_tokens'])
completion_tokens = int(usage['completion_tokens'])
self._cost_manager.update_cost(prompt_tokens, completion_tokens, self.model)
CONFIG.cost_manager.update_cost(prompt_tokens, completion_tokens, self.model)
except Exception as e:
logger.error("updating costs failed!", e)
def get_costs(self) -> Costs:
return self._cost_manager.get_costs()
return CONFIG.cost_manager.get_costs()
def get_max_tokens(self, messages: list[dict]):
if not self.auto_max_tokens:
return CONFIG.max_tokens_rsp
return get_max_completion_tokens(messages, self.model, CONFIG.max_tokens_rsp)
async def get_summary(self, text: str, max_words=20):
"""Generate text summary"""
if len(text) < max_words:
return text
language = CONFIG.language or DEFAULT_LANGUAGE
command = f"Translate the above content into a {language} summary of less than {max_words} words."
msg = text + "\n\n" + command
logger.info(f"summary ask:{msg}")
response = await self.aask(msg=msg, system_msgs=[])
logger.info(f"summary rsp: {response}")
return response
async def get_context_title(self, text: str, max_token_count_per_ask=None, max_words=5) -> str:
"""Generate text title"""
max_response_token_count = 50
max_token_count = max_token_count_per_ask or CONFIG.MAX_TOKENS or DEFAULT_MAX_TOKENS
text_windows = self.split_texts(text, window_size=max_token_count - max_response_token_count)
summaries = []
for ws in text_windows:
response = await self.get_summary(ws)
summaries.append(response)
if len(summaries) == 1:
return summaries[0]
language = CONFIG.language or DEFAULT_LANGUAGE
command = f"Translate the above summary into a {language} title of less than {max_words} words."
summaries.append(command)
msg = "\n".join(summaries)
logger.info(f"title ask:{msg}")
response = await self.aask(msg=msg, system_msgs=[])
logger.info(f"title rsp: {response}")
return response
async def is_related(self, text1, text2):
command = f"{text1}\n{text2}\n\nIf the two sentences above are related, return [TRUE] brief and clear. Otherwise, return [FALSE]."
rsp = await self.aask(msg=command, system_msgs=[])
result, _ = self.extract_info(rsp)
return result == "TRUE"
async def rewrite(self, sentence: str, context: str):
command = f"{context}\n\nConsidering the content above, rewrite and return this sentence brief and clear:\n{sentence}"
rsp = await self.aask(msg=command, system_msgs=[])
return rsp
@staticmethod
def split_texts(text: str, window_size) -> List[str]:
"""Splitting long text into sliding windows text"""
total_len = len(text)
if total_len <= window_size:
return [text]
padding_size = 20 if window_size > 20 else 0
windows = []
idx = 0
while idx < total_len:
data_len = window_size - padding_size
if data_len + idx > total_len:
windows.append(text[idx:])
break
w = text[idx:data_len]
windows.append(w)
for i in range(len(windows)):
if i + 1 == len(windows):
break
windows[i] += windows[i + 1][0:padding_size]
return windows
@staticmethod
def extract_info(input_string):
pattern = r'\[([A-Z]+)\]:\s*(.+)'
match = re.match(pattern, input_string)
if match:
return match.group(1), match.group(2)
else:
return None, input_string
@staticmethod
async def async_retry_call(func, *args, **kwargs):
for i in range(OpenAIGPTAPI.MAX_TRY):
try:
rsp = await func(*args, **kwargs)
return rsp
except openai.error.RateLimitError as e:
random_time = random.uniform(0, 3) # 生成0到5秒之间的随机时间
rounded_time = round(random_time, 1) # 保留一位小数以实现0.1秒的精度
logger.warning(f"Exception:{e}, sleeping for {rounded_time} seconds")
await asyncio.sleep(rounded_time)
continue
except Exception as e:
error_str = traceback.format_exc()
logger.error(f"Exception:{e}, stack:{error_str}")
raise e
raise openai.error.OpenAIError("Exceeds the maximum retries")
@staticmethod
def retry_call(func, *args, **kwargs):
for i in range(OpenAIGPTAPI.MAX_TRY):
try:
rsp = func(*args, **kwargs)
return rsp
except openai.error.RateLimitError as e:
logger.warning(f"Exception:{e}")
continue
except (openai.error.AuthenticationError,
openai.error.PermissionError,
openai.error.InvalidAPIType,
openai.error.SignatureVerificationError) as e:
logger.warning(f"Exception:{e}")
raise e
except Exception as e:
error_str = traceback.format_exc()
logger.error(f"Exception:{e}, stack:{error_str}")
raise e
raise openai.error.OpenAIError("Exceeds the maximum retries")
MAX_TRY = 5

155
metagpt/roles/assistant.py Normal file
View file

@ -0,0 +1,155 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/7
@Author : mashenquan
@File : assistant.py
@Desc : I am attempting to incorporate certain symbol concepts from UML into MetaGPT, enabling it to have the
ability to freely construct flows through symbol concatenation. Simultaneously, I am also striving to
make these symbols configurable and standardized, making the process of building flows more convenient.
For more about `fork` node in activity diagrams, see: `https://www.uml-diagrams.org/activity-diagrams.html`
This file defines a `fork` style meta role capable of generating arbitrary roles at runtime based on a
configuration file.
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false
indicates that further reasoning cannot continue.
"""
import asyncio
from pathlib import Path
from metagpt.actions import ActionOutput
from metagpt.actions.skill_action import SkillAction, ArgumentsParingAction
from metagpt.actions.talk_action import TalkAction
from metagpt.config import CONFIG
from metagpt.learn.skill_loader import SkillLoader
from metagpt.logs import logger
from metagpt.memory.brain_memory import BrainMemory, MessageType
from metagpt.roles import Role
from metagpt.schema import Message
class Assistant(Role):
"""Assistant for solving common issues."""
def __init__(self, name="Lily", profile="An assistant", goal="Help to solve problem",
constraints="Talk in {language}", desc="", *args, **kwargs):
super(Assistant, self).__init__(name=name, profile=profile,
goal=goal, constraints=constraints, desc=desc, *args, **kwargs)
brain_memory = CONFIG.BRAIN_MEMORY
self.memory = BrainMemory(**brain_memory) if brain_memory else BrainMemory()
skill_path = Path(CONFIG.SKILL_PATH) if CONFIG.SKILL_PATH else None
self.skills = SkillLoader(skill_yaml_file_name=skill_path)
async def think(self) -> bool:
"""Everything will be done part by part."""
last_talk = await self.refine_memory()
if not last_talk:
return False
prompt = f"Refer to this sentence:\n {last_talk}\n"
skills = self.skills.get_skill_list()
for desc, name in skills.items():
prompt += f"If want you to do {desc}, return `[SKILL]: {name}` brief and clear. For instance: [SKILL]: text_to_image\n"
prompt += "If the preceding text presents a complete question and solution, rewrite and return `[SOLUTION]: {problem}` brief and clear. For instance: [SOLUTION]: Solution for distributing watermelon\n"
prompt += "If the preceding text presents an unresolved issue and its corresponding discussion, rewrite and return `[PROBLEM]: {problem}` brief and clear. For instance: [PROBLEM]: How to distribute watermelon?\n"
prompt += "Otherwise, rewrite and return `[TALK]: {talk}` brief and clear. For instance: [TALK]: distribute watermelon"
logger.info(prompt)
rsp = await self._llm.aask(prompt, [])
logger.info(rsp)
return await self._plan(rsp, last_talk=last_talk)
async def act(self) -> ActionOutput:
result = await self._rc.todo.run(**CONFIG.options)
if not result:
return None
if isinstance(result, str):
msg = Message(content=result)
output = ActionOutput(content=result)
else:
msg = Message(content=result.content, instruct_content=result.instruct_content,
cause_by=type(self._rc.todo))
output = result
self.memory.add_answer(msg)
return output
async def talk(self, text):
self.memory.add_talk(Message(content=text))
async def _plan(self, rsp: str, **kwargs) -> bool:
skill, text = Assistant.extract_info(input_string=rsp)
handlers = {
MessageType.Talk.value: self.talk_handler,
MessageType.Problem.value: self.talk_handler,
MessageType.Skill.value: self.skill_handler,
}
handler = handlers.get(skill, self.talk_handler)
return await handler(text, **kwargs)
async def talk_handler(self, text, **kwargs) -> bool:
action = TalkAction(talk=text, knowledge=self.memory.get_knowledge(), llm=self._llm,
**kwargs)
self.add_to_do(action)
return True
async def skill_handler(self, text, **kwargs) -> bool:
last_talk = kwargs.get("last_talk")
skill = self.skills.get_skill(text)
if not skill:
logger.info(f"skill not found: {text}")
return await self.talk_handler(text=last_talk, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self._llm, **kwargs)
await action.run(**kwargs)
if action.args is None:
return await self.talk_handler(text=last_talk, **kwargs)
action = SkillAction(skill=skill, args=action.args, llm=self._llm, name=skill.name, desc=skill.description)
self.add_to_do(action)
return True
async def refine_memory(self) -> str:
history_text = self.memory.history_text
last_talk = self.memory.last_talk
if last_talk is None: # No user feedback, unsure if past conversation is finished.
return None
if history_text == "":
return last_talk
history_summary = await self._llm.get_context_title(history_text, max_words=20)
if last_talk and await self._llm.is_related(last_talk, history_summary): # Merge relevant content.
last_talk = await self._llm.rewrite(sentence=last_talk, context=history_text)
return last_talk
self.memory.move_to_solution() # Promptly clear memory after the issue is resolved.
return last_talk
@staticmethod
def extract_info(input_string):
from metagpt.provider.openai_api import OpenAIGPTAPI
return OpenAIGPTAPI.extract_info(input_string)
def get_memory(self) -> str:
return self.memory.json()
def load_memory(self, jsn):
try:
self.memory = BrainMemory(**jsn)
except Exception as e:
logger.exception(f"load error:{e}, data:{jsn}")
async def main():
topic = "what's apple"
role = Assistant(language="Chinese")
await role.talk(topic)
while True:
has_action = await role.think()
if not has_action:
break
msg = await role.act()
logger.info(msg)
# Retrieve user terminal input.
logger.info("Enter prompt")
talk = input("You: ")
await role.talk(talk)
if __name__ == '__main__':
CONFIG.language = "Chinese"
asyncio.run(main())

View file

@ -1,4 +1,8 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
"""
import asyncio
@ -35,15 +39,16 @@ class Researcher(Role):
if language not in ("en-us", "zh-cn"):
logger.warning(f"The language `{language}` has not been tested, it may not work.")
async def _think(self) -> None:
async def _think(self) -> bool:
if self._rc.todo is None:
self._set_state(0)
return
return True
if self._rc.state + 1 < len(self._states):
self._set_state(self._rc.state + 1)
else:
self._rc.todo = None
return False
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")

View file

@ -4,20 +4,23 @@
@Time : 2023/5/11 14:42
@Author : alexanderwu
@File : role.py
@Modified By: mashenquan, 2023-8-7, Support template-style variables, such as '{teaching_language} Teacher'.
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
"""
from __future__ import annotations
from typing import Iterable, Type
from pydantic import BaseModel, Field
# from metagpt.environment import Environment
from metagpt.config import CONFIG
from metagpt.actions import Action, ActionOutput
from metagpt.const import OPTIONS
from metagpt.llm import LLM
from metagpt.actions import Action, ActionOutput
from metagpt.logs import logger
from metagpt.memory import Memory, LongTermMemory
from metagpt.schema import Message
from metagpt.schema import Message, MessageTag
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}, and the constraint is {constraints}. """
@ -48,7 +51,7 @@ ROLE_TEMPLATE = """Your response should be based on the previous conversation hi
class RoleSetting(BaseModel):
"""角色设定"""
"""Role properties"""
name: str
profile: str
goal: str
@ -63,7 +66,7 @@ class RoleSetting(BaseModel):
class RoleContext(BaseModel):
"""角色运行时上下文"""
"""Runtime role context"""
env: 'Environment' = Field(default=None)
memory: Memory = Field(default_factory=Memory)
long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)
@ -76,24 +79,36 @@ class RoleContext(BaseModel):
arbitrary_types_allowed = True
def check(self, role_id: str):
if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
if CONFIG.long_term_memory:
self.long_term_memory.recover_memory(role_id, self)
self.memory = self.long_term_memory # use memory to act as long_term_memory for unify operation
@property
def important_memory(self) -> list[Message]:
"""获得关注动作对应的信息"""
"""Retrieve information corresponding to the attention action."""
return self.memory.get_by_actions(self.watch)
@property
def history(self) -> list[Message]:
return self.memory.get()
@property
def prerequisite(self):
"""Retrieve information with `prerequisite` tag"""
return self.memory.get_by_tags([MessageTag.Prerequisite.value])
class Role:
"""角色/代理"""
"""Role/Proxy"""
def __init__(self, name="", profile="", goal="", constraints="", desc="", *args, **kwargs):
# Replace template-style variables, such as '{teaching_language} Teacher'.
name = Role.format_value(name)
profile = Role.format_value(profile)
goal = Role.format_value(goal)
constraints = Role.format_value(constraints)
desc = Role.format_value(desc)
def __init__(self, name="", profile="", goal="", constraints="", desc=""):
self._llm = LLM()
self._setting = RoleSetting(name=name, profile=profile, goal=goal, constraints=constraints, desc=desc)
self._states = []
@ -109,7 +124,7 @@ class Role:
self._reset()
for idx, action in enumerate(actions):
if not isinstance(action, Action):
i = action("")
i = action("", llm=self._llm)
else:
i = action
i.set_prefix(self._get_prefix(), self.profile)
@ -137,18 +152,43 @@ class Role:
"""获取角色描述(职位)"""
return self._setting.profile
@property
def name(self):
"""Return role `name`, read only"""
return self._setting.name
@property
def desc(self):
"""Return role `desc`, read only"""
return self._setting.desc
@property
def goal(self):
"""Return role `goal`, read only"""
return self._setting.goal
@property
def constraints(self):
"""Return role `constraints`, read only"""
return self._setting.constraints
@property
def action_count(self):
"""Return number of action"""
return len(self._actions)
def _get_prefix(self):
"""获取角色前缀"""
if self._setting.desc:
return self._setting.desc
return PREFIX_TEMPLATE.format(**self._setting.dict())
async def _think(self) -> None:
"""思考要做什么决定下一步的action"""
async def _think(self) -> bool:
"""Consider what to do and decide on the next course of action. Return false if nothing can be done."""
if len(self._actions) == 1:
# 如果只有一个动作,那就只能做这个
self._set_state(0)
return
return True
prompt = self._get_prefix()
prompt += STATE_TEMPLATE.format(history=self._rc.history, states="\n".join(self._states),
n_states=len(self._states) - 1)
@ -158,6 +198,7 @@ class Role:
logger.warning(f'Invalid answer of state, {next_state=}')
next_state = "0"
self._set_state(int(next_state))
return True
async def _act(self) -> Message:
# prompt = self.get_prefix()
@ -165,7 +206,8 @@ class Role:
# history=self.history)
logger.info(f"{self._setting}: ready to {self._rc.todo}")
response = await self._rc.todo.run(self._rc.important_memory)
requirement = self._rc.important_memory or self._rc.prerequisite
response = await self._rc.todo.run(requirement)
# logger.info(response)
if isinstance(response, ActionOutput):
msg = Message(content=response.content, instruct_content=response.instruct_content,
@ -182,9 +224,9 @@ class Role:
if not self._rc.env:
return 0
env_msgs = self._rc.env.memory.get()
observed = self._rc.env.memory.get_by_actions(self._rc.watch)
self._rc.news = self._rc.memory.remember(observed) # remember recent exact or similar memories
for i in env_msgs:
@ -241,3 +283,50 @@ class Role:
# 将回复发布到环境,等待下一个订阅者处理
self._publish_message(rsp)
return rsp
@staticmethod
def format_value(value):
"""Fill parameters inside `value` with `options`."""
if not isinstance(value, str):
return value
if "{" not in value:
return value
merged_opts = OPTIONS.get() or {}
try:
return value.format(**merged_opts)
except KeyError as e:
logger.warning(f"Parameter is missing:{e}")
for k, v in merged_opts.items():
value = value.replace("{" + f"{k}" + "}", str(v))
return value
def add_action(self, act):
self._actions.append(act)
def add_to_do(self, act):
self._rc.todo = act
async def think(self) -> bool:
"""The exported `think` function"""
has_action = await self._think()
if not has_action:
return False
if not self._rc.todo:
return False
return True
async def act(self) -> ActionOutput:
"""The exported `act` function"""
msg = await self._act()
return ActionOutput(content=msg.content,
instruct_content=msg.instruct_content)
@property
def todo_description(self):
if not self._rc or not self._rc.todo:
return ""
if self._rc.todo.desc:
return self._rc.todo.desc
return f"{type(self._rc.todo).__name__}"

101
metagpt/roles/teacher.py Normal file
View file

@ -0,0 +1,101 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/27
@Author : mashenquan
@File : teacher.py
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
"""
import aiofiles
from metagpt.actions.write_teaching_plan import WriteTeachingPlanPart, TeachingPlanRequirement
from metagpt.const import WORKSPACE_ROOT
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.logs import logger
import re
class Teacher(Role):
"""Support configurable teacher roles,
with native and teaching languages being replaceable through configurations."""
def __init__(self, name='Lily', profile='{teaching_language} Teacher',
goal='writing a {language} teaching plan part by part',
constraints='writing in {language}', desc="", *args, **kwargs):
super().__init__(name=name, profile=profile, goal=goal, constraints=constraints, desc=desc, *args, **kwargs)
actions = []
for topic in WriteTeachingPlanPart.TOPICS:
act = WriteTeachingPlanPart(topic=topic, llm=self._llm)
actions.append(act)
self._init_actions(actions)
self._watch({TeachingPlanRequirement})
async def _think(self) -> bool:
"""Everything will be done part by part."""
if self._rc.todo is None:
self._set_state(0)
return True
if self._rc.state + 1 < len(self._states):
self._set_state(self._rc.state + 1)
return True
self._rc.todo = None
return False
async def _react(self) -> Message:
ret = Message(content="")
while True:
await self._think()
if self._rc.todo is None:
break
logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
msg = await self._act()
if ret.content != '':
ret.content += "\n\n\n"
ret.content += msg.content
logger.info(ret.content)
await self.save(ret.content)
return ret
async def save(self, content):
"""Save teaching plan"""
filename = Teacher.new_file_name(self.course_title)
pathname = WORKSPACE_ROOT / "teaching_plan"
pathname.mkdir(exist_ok=True)
pathname = pathname / filename
try:
async with aiofiles.open(str(pathname), mode='w', encoding='utf-8') as writer:
await writer.write(content)
except Exception as e:
logger.error(f'Save failed{e}')
logger.info(f"Save to:{pathname}")
@staticmethod
def new_file_name(lesson_title, ext=".md"):
"""Create a related file name based on `lesson_title` and `ext`."""
# Define the special characters that need to be replaced.
illegal_chars = r'[#@$%!*&\\/:*?"<>|\n\t \']'
# Replace the special characters with underscores.
filename = re.sub(illegal_chars, '_', lesson_title) + ext
return re.sub(r'_+', '_', filename)
@property
def course_title(self):
"""Return course title of teaching plan"""
default_title = "teaching_plan"
for act in self._actions:
if act.topic != WriteTeachingPlanPart.COURSE_TITLE:
continue
if act.rsp is None:
return default_title
title = act.rsp.lstrip("# \n")
if '\n' in title:
ix = title.index('\n')
title = title[0: ix]
return title
return default_title

View file

@ -4,17 +4,23 @@
@Time : 2023/5/8 22:12
@Author : alexanderwu
@File : schema.py
@Desc : mashenquan, 2023/8/22. Add tags to enable custom message classification.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Type, TypedDict
from enum import Enum
from typing import Type, TypedDict, Set, Optional, List
from pydantic import BaseModel
from metagpt.logs import logger
class MessageTag(Enum):
Prerequisite = "prerequisite"
class RawMessage(TypedDict):
content: str
role: str
@ -29,6 +35,7 @@ class Message:
cause_by: Type["Action"] = field(default="")
sent_from: str = field(default="")
send_to: str = field(default="")
tags: Optional[Set] = field(default=None)
def __str__(self):
# prefix = '-'.join([self.role, str(self.cause_by)])
@ -43,12 +50,48 @@ class Message:
"content": self.content
}
def add_tag(self, tag):
if self.tags is None:
self.tags = set()
self.tags.add(tag)
def remove_tag(self, tag):
if self.tags is None or tag not in self.tags:
return
self.tags.remove(tag)
def is_contain_tags(self, tags: list) -> bool:
"""Determine whether the message contains tags."""
if not tags or not self.tags:
return False
intersection = set(tags) & self.tags
return len(intersection) > 0
def is_contain(self, tag):
return self.is_contain_tags([tag])
def dict(self):
"""pydantic-like `dict` function"""
full = {
"instruct_content": self.instruct_content,
"sent_from": self.sent_from,
"send_to": self.send_to,
"tags": self.tags
}
m = {"content": self.content}
for k, v in full.items():
if v:
m[k] = v
return m
@dataclass
class UserMessage(Message):
"""便于支持OpenAI的消息
Facilitate support for OpenAI messages
"""
def __init__(self, content: str):
super().__init__(content, 'user')
@ -58,6 +101,7 @@ class SystemMessage(Message):
"""便于支持OpenAI的消息
Facilitate support for OpenAI messages
"""
def __init__(self, content: str):
super().__init__(content, 'system')
@ -67,6 +111,7 @@ class AIMessage(Message):
"""便于支持OpenAI的消息
Facilitate support for OpenAI messages
"""
def __init__(self, content: str):
super().__init__(content, 'assistant')

View file

@ -35,17 +35,18 @@ class SoftwareCompany(BaseModel):
def invest(self, investment: float):
"""Invest company. raise NoMoneyException when exceed max_budget."""
self.investment = investment
CONFIG.max_budget = investment
CONFIG.cost_manager.max_budget = investment
logger.info(f'Investment: ${investment}.')
def _check_balance(self):
if CONFIG.total_cost > CONFIG.max_budget:
raise NoMoneyException(CONFIG.total_cost, f'Insufficient funds: {CONFIG.max_budget}')
if CONFIG.cost_manager.total_cost > CONFIG.cost_manager.max_budget:
raise NoMoneyException(CONFIG.cost_manager.total_cost,
f'Insufficient funds: {CONFIG.cost_manager.max_budget}')
def start_project(self, idea):
def start_project(self, idea, role="BOSS", cause_by=BossRequirement, **kwargs):
"""Start a project from publishing boss requirement."""
self.idea = idea
self.environment.publish_message(Message(role="BOSS", content=idea, cause_by=BossRequirement))
self.environment.publish_message(Message(content=idea, role=role, cause_by=cause_by))
def _save(self):
logger.info(self.json())

114
metagpt/tools/azure_tts.py Normal file
View file

@ -0,0 +1,114 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/17
@Author : mashenquan
@File : azure_tts.py
@Desc : azure TTS OAS3 api, which provides text-to-speech functionality
"""
import asyncio
from pathlib import Path
from uuid import uuid4
import base64
import sys
sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) # fix-bug: No module named 'metagpt'
from metagpt.logs import logger
from aiofile import async_open
from azure.cognitiveservices.speech import AudioConfig, SpeechConfig, SpeechSynthesizer
import os
class AzureTTS:
"""Azure Text-to-Speech"""
def __init__(self, subscription_key, region):
"""
:param subscription_key: key is used to access your Azure AI service API, see: `https://portal.azure.com/` > `Resource Management` > `Keys and Endpoint`
:param region: This is the location (or region) of your resource. You may need to use this field when making calls to this API.
"""
self.subscription_key = subscription_key if subscription_key else os.environ.get('AZURE_TTS_SUBSCRIPTION_KEY')
self.region = region if region else os.environ.get('AZURE_TTS_REGION')
# 参数参考https://learn.microsoft.com/zh-cn/azure/cognitive-services/speech-service/language-support?tabs=tts#voice-styles-and-roles
async def synthesize_speech(self, lang, voice, text, output_file):
speech_config = SpeechConfig(
subscription=self.subscription_key, region=self.region)
speech_config.speech_synthesis_voice_name = voice
audio_config = AudioConfig(filename=output_file)
synthesizer = SpeechSynthesizer(
speech_config=speech_config,
audio_config=audio_config)
# More detail: https://learn.microsoft.com/en-us/azure/ai-services/speech-service/speech-synthesis-markup-voice
ssml_string = "<speak version='1.0' xmlns='http://www.w3.org/2001/10/synthesis' " \
f"xml:lang='{lang}' xmlns:mstts='http://www.w3.org/2001/mstts'>" \
f"<voice name='{voice}'>{text}</voice></speak>"
return synthesizer.speak_ssml_async(ssml_string).get()
@staticmethod
def role_style_text(role, style, text):
return f'<mstts:express-as role="{role}" style="{style}">{text}</mstts:express-as>'
@staticmethod
def role_text(role, text):
return f'<mstts:express-as role="{role}">{text}</mstts:express-as>'
@staticmethod
def style_text(style, text):
return f'<mstts:express-as style="{style}">{text}</mstts:express-as>'
# Export
async def oas3_azsure_tts(text, lang="", voice="", style="", role="", subscription_key="", region=""):
"""Text to speech
For more details, check out:`https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param lang: The value can contain a language code such as en (English), or a locale such as en-US (English - United States). For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param voice: For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`, `https://speech.microsoft.com/portal/voicegallery`
:param style: Speaking style to express different emotions like cheerfulness, empathy, and calm. For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param role: With roles, the same voice can act as a different age and gender. For more details, checkout: `https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
:param text: The text used for voice conversion.
:param subscription_key: key is used to access your Azure AI service API, see: `https://portal.azure.com/` > `Resource Management` > `Keys and Endpoint`
:param region: This is the location (or region) of your resource. You may need to use this field when making calls to this API.
:return: Returns the Base64-encoded .wav file data if successful, otherwise an empty string.
"""
if not text:
return ""
if not lang:
lang = "zh-CN"
if not voice:
voice = "zh-CN-XiaomoNeural"
if not role:
role = "Girl"
if not style:
style = "affectionate"
if not subscription_key:
subscription_key = os.environ.get("AZURE_TTS_SUBSCRIPTION_KEY")
if not region:
region = os.environ.get("AZURE_TTS_REGION")
xml_value = AzureTTS.role_style_text(role=role, style=style, text=text)
tts = AzureTTS(subscription_key=subscription_key, region=region)
filename = Path(__file__).resolve().parent / (str(uuid4()).replace("-", "") + ".wav")
try:
await tts.synthesize_speech(lang=lang, voice=voice, text=xml_value, output_file=str(filename))
async with async_open(filename, mode="rb") as reader:
data = await reader.read()
base64_string = base64.b64encode(data).decode('utf-8')
filename.unlink()
except Exception as e:
logger.error(f"text:{text}, error:{e}")
return ""
return base64_string
if __name__ == "__main__":
loop = asyncio.new_event_loop()
v = loop.create_task(oas3_azsure_tts("测试test"))
loop.run_until_complete(v)
print(v)

27
metagpt/tools/hello.py Normal file
View file

@ -0,0 +1,27 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/2 16:03
@Author : mashenquan
@File : hello.py
@Desc : Implement the OpenAPI Specification 3.0 demo and use the following command to test the HTTP service:
curl -X 'POST' \
'http://localhost:8080/openapi/greeting/dave' \
-H 'accept: text/plain' \
-H 'Content-Type: application/json' \
-d '{}'
"""
import connexion
# openapi implement
async def post_greeting(name: str) -> str:
return f"Hello {name}\n"
if __name__ == "__main__":
app = connexion.AioHttpApp(__name__, specification_dir='../../.well-known/')
app.add_api("openapi.yaml", arguments={"title": "Hello World Example"})
app.run(port=8080)

View file

@ -0,0 +1,43 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/17
@Author : mashenquan
@File : metagpt_oas3_api_svc.py
@Desc : MetaGPT OpenAPI Specification 3.0 REST API service
"""
import asyncio
from pathlib import Path
import sys
import connexion
sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) # fix-bug: No module named 'metagpt'
def oas_http_svc():
"""Start the OAS 3.0 OpenAPI HTTP service"""
app = connexion.AioHttpApp(__name__, specification_dir='../../.well-known/')
app.add_api("metagpt_oas3_api.yaml")
app.add_api("openapi.yaml")
app.run(port=8080)
async def async_main():
"""Start the OAS 3.0 OpenAPI HTTP service in the background."""
loop = asyncio.get_event_loop()
loop.run_in_executor(None, oas_http_svc)
# TODO: replace following codes:
while True:
await asyncio.sleep(1)
print("sleep")
def main():
oas_http_svc()
if __name__ == "__main__":
# asyncio.run(async_main())
main()

View file

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : metagpt_text_to_image.py
@Desc : MetaGPT Text-to-Image OAS3 api, which provides text-to-image functionality.
"""
import base64
import os
import sys
from pathlib import Path
from typing import List, Dict
import aiohttp
import requests
from pydantic import BaseModel
sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) # fix-bug: No module named 'metagpt'
from metagpt.logs import logger
class MetaGPTText2Image:
def __init__(self, model_url):
"""
:param model_url: Model reset api url
"""
self.model_url = model_url if model_url else os.environ.get('METAGPT_TEXT_TO_IMAGE_MODEL')
async def text_2_image(self, text, size_type="512x512"):
"""Text to image
:param text: The text used for image conversion.
:param size_type: One of ['512x512', '512x768']
:return: The image data is returned in Base64 encoding.
"""
headers = {
"Content-Type": "application/json"
}
dims = size_type.split("x")
data = {
"prompt": text,
"negative_prompt": "(easynegative:0.8),black, dark,Low resolution",
"override_settings": {"sd_model_checkpoint": "galaxytimemachinesGTM_photoV20"},
"seed": -1,
"batch_size": 1,
"n_iter": 1,
"steps": 20,
"cfg_scale": 11,
"width": int(dims[0]),
"height": int(dims[1]), # 768,
"restore_faces": False,
"tiling": False,
"do_not_save_samples": False,
"do_not_save_grid": False,
"enable_hr": False,
"hr_scale": 2,
"hr_upscaler": "Latent",
"hr_second_pass_steps": 0,
"hr_resize_x": 0,
"hr_resize_y": 0,
"hr_upscale_to_x": 0,
"hr_upscale_to_y": 0,
"truncate_x": 0,
"truncate_y": 0,
"applied_old_hires_behavior_to": None,
"eta": None,
"sampler_index": "DPM++ SDE Karras",
"alwayson_scripts": {},
}
class ImageResult(BaseModel):
images: List
parameters: Dict
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.model_url, headers=headers, json=data) as response:
result = ImageResult(**await response.json())
if len(result.images) == 0:
return ""
return result.images[0]
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred:{e}")
return ""
# Export
async def oas3_metagpt_text_to_image(text, size_type: str = "512x512", model_url=""):
"""Text to image
:param text: The text used for image conversion.
:param model_url: Model reset api
:param size_type: One of ['512x512', '512x768']
:return: The image data is returned in Base64 encoding.
"""
if not text:
return ""
if not model_url:
model_url = os.environ.get('METAGPT_TEXT_TO_IMAGE_MODEL_URL')
return await MetaGPTText2Image(model_url).text_2_image(text, size_type=size_type)
if __name__ == "__main__":
v = oas3_metagpt_text_to_image("Panda emoji")
data = base64.b64decode(v)
with open("tmp.png", mode="wb") as writer:
writer.write(data)
print(v)

View file

@ -0,0 +1,95 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : openai_text_to_embedding.py
@Desc : OpenAI Text-to-Embedding OAS3 api, which provides text-to-embedding functionality.
For more details, checkout: `https://platform.openai.com/docs/api-reference/embeddings/object`
"""
import asyncio
import os
from pathlib import Path
from typing import List
import aiohttp
import requests
from pydantic import BaseModel
import sys
from metagpt.config import CONFIG
sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) # fix-bug: No module named 'metagpt'
from metagpt.logs import logger
class Embedding(BaseModel):
"""Represents an embedding vector returned by embedding endpoint."""
object: str # The object type, which is always "embedding".
embedding: List[
float] # The embedding vector, which is a list of floats. The length of vector depends on the model as listed in the embedding guide.
index: int # The index of the embedding in the list of embeddings.
class Usage(BaseModel):
prompt_tokens: int
total_tokens: int
class ResultEmbedding(BaseModel):
object: str
data: List[Embedding]
model: str
usage: Usage
class OpenAIText2Embedding:
def __init__(self, openai_api_key):
"""
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
"""
self.openai_api_key = openai_api_key if openai_api_key else os.environ.get('OPENAI_API_KEY')
async def text_2_embedding(self, text, model="text-embedding-ada-002"):
"""Text to embedding
:param text: The text used for embedding.
:param model: One of ['text-embedding-ada-002'], ID of the model to use. For more details, checkout: `https://api.openai.com/v1/models`.
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.openai_api_key}"
}
data = {"input": text, "model": model}
try:
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/embeddings", headers=headers, json=data) as response:
return await response.json()
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred:{e}")
return {}
# Export
async def oas3_openai_text_to_embedding(text, model="text-embedding-ada-002", openai_api_key=""):
"""Text to embedding
:param text: The text used for embedding.
:param model: One of ['text-embedding-ada-002'], ID of the model to use. For more details, checkout: `https://api.openai.com/v1/models`.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
if not text:
return ""
if not openai_api_key:
openai_api_key = CONFIG.OPENAI_API_KEY
return await OpenAIText2Embedding(openai_api_key).text_2_embedding(text, model=model)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
v = loop.create_task(oas3_openai_text_to_embedding("Panda emoji"))
loop.run_until_complete(v)
print(v)

View file

@ -0,0 +1,99 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/17
@Author : mashenquan
@File : openai_text_to_image.py
@Desc : OpenAI Text-to-Image OAS3 api, which provides text-to-image functionality.
"""
import base64
import os
import sys
from pathlib import Path
from typing import List
import aiohttp
import requests
from pydantic import BaseModel
sys.path.append(str(Path(__file__).resolve().parent.parent.parent)) # fix-bug: No module named 'metagpt'
from metagpt.logs import logger
class OpenAIText2Image:
def __init__(self, openai_api_key):
"""
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
"""
self.openai_api_key = openai_api_key if openai_api_key else os.environ.get('OPENAI_API_KEY')
async def text_2_image(self, text, size_type="1024x1024"):
"""Text to image
:param text: The text used for image conversion.
:param size_type: One of ['256x256', '512x512', '1024x1024']
:return: The image data is returned in Base64 encoding.
"""
class ImageUrl(BaseModel):
url: str
class ImageResult(BaseModel):
data: List[ImageUrl]
created: int
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.openai_api_key}"
}
data = {"prompt": text, "n": 1, "size": size_type}
try:
async with aiohttp.ClientSession() as session:
async with session.post("https://api.openai.com/v1/images/generations", headers=headers, json=data) as response:
result = ImageResult(** await response.json())
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred:{e}")
return ""
if len(result.data) > 0:
return await OpenAIText2Image.get_image_data(result.data[0].url)
return ""
@staticmethod
async def get_image_data(url):
"""Fetch image data from a URL and encode it as Base64
:param url: Image url
:return: Base64-encoded image data.
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status() # 如果是 4xx 或 5xx 响应,会引发异常
image_data = await response.read()
base64_image = base64.b64encode(image_data).decode("utf-8")
return base64_image
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred:{e}")
return ""
# Export
async def oas3_openai_text_to_image(text, size_type: str = "1024x1024", openai_api_key=""):
"""Text to image
:param text: The text used for image conversion.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param size_type: One of ['256x256', '512x512', '1024x1024']
:return: The image data is returned in Base64 encoding.
"""
if not text:
return ""
if not openai_api_key:
openai_api_key = os.environ.get("OPENAI_API_KEY")
return await OpenAIText2Image(openai_api_key).text_2_image(text, size_type=size_type)
if __name__ == "__main__":
v = oas3_openai_text_to_image("Panda emoji")
print(v)

View file

@ -4,11 +4,12 @@
@Time : 2023/5/6 20:15
@Author : alexanderwu
@File : search_engine.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import importlib
from typing import Callable, Coroutine, Literal, overload
from typing import Callable, Coroutine, Literal, overload, Dict
from metagpt.config import CONFIG
from metagpt.tools import SearchEngineType
@ -25,24 +26,25 @@ class SearchEngine:
run_func: The function to run the search.
engine: The search engine type.
"""
def __init__(
self,
engine: SearchEngineType | None = None,
run_func: Callable[[str, int, bool], Coroutine[None, None, str | list[str]]] = None,
self,
engine: SearchEngineType | None = None,
run_func: Callable[[str, int, bool], Coroutine[None, None, str | list[str]]] = None
):
engine = engine or CONFIG.search_engine
if engine == SearchEngineType.SERPAPI_GOOGLE:
module = "metagpt.tools.search_engine_serpapi"
run_func = importlib.import_module(module).SerpAPIWrapper().run
run_func = importlib.import_module(module).SerpAPIWrapper(**CONFIG.options).run
elif engine == SearchEngineType.SERPER_GOOGLE:
module = "metagpt.tools.search_engine_serper"
run_func = importlib.import_module(module).SerperWrapper().run
run_func = importlib.import_module(module).SerperWrapper(**CONFIG.options).run
elif engine == SearchEngineType.DIRECT_GOOGLE:
module = "metagpt.tools.search_engine_googleapi"
run_func = importlib.import_module(module).GoogleAPIWrapper().run
run_func = importlib.import_module(module).GoogleAPIWrapper(**CONFIG.options).run
elif engine == SearchEngineType.DUCK_DUCK_GO:
module = "metagpt.tools.search_engine_ddg"
run_func = importlib.import_module(module).DDGAPIWrapper().run
run_func = importlib.import_module(module).DDGAPIWrapper(**CONFIG.options).run
elif engine == SearchEngineType.CUSTOM_ENGINE:
pass # run_func = run_func
else:
@ -52,19 +54,19 @@ class SearchEngine:
@overload
def run(
self,
query: str,
max_results: int = 8,
as_string: Literal[True] = True,
self,
query: str,
max_results: int = 8,
as_string: Literal[True] = True,
) -> str:
...
@overload
def run(
self,
query: str,
max_results: int = 8,
as_string: Literal[False] = False,
self,
query: str,
max_results: int = 8,
as_string: Literal[False] = False,
) -> list[dict[str, str]]:
...

View file

@ -1,11 +1,14 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import asyncio
import json
from concurrent import futures
from typing import Literal, overload
from typing import Literal, overload, Optional
try:
from duckduckgo_search import DDGS
@ -15,8 +18,6 @@ except ImportError:
"You can install it by running the command: `pip install -e.[search-ddg]`"
)
from metagpt.config import CONFIG
class DDGAPIWrapper:
"""Wrapper around duckduckgo_search API.
@ -25,43 +26,44 @@ class DDGAPIWrapper:
"""
def __init__(
self,
*,
loop: asyncio.AbstractEventLoop | None = None,
executor: futures.Executor | None = None,
self,
*,
global_proxy: Optional[str] = None,
loop: asyncio.AbstractEventLoop | None = None,
executor: futures.Executor | None = None,
):
kwargs = {}
if CONFIG.global_proxy:
kwargs["proxies"] = CONFIG.global_proxy
if global_proxy:
kwargs["proxies"] = global_proxy
self.loop = loop
self.executor = executor
self.ddgs = DDGS(**kwargs)
@overload
def run(
self,
query: str,
max_results: int = 8,
as_string: Literal[True] = True,
focus: list[str] | None = None,
self,
query: str,
max_results: int = 8,
as_string: Literal[True] = True,
focus: list[str] | None = None,
) -> str:
...
@overload
def run(
self,
query: str,
max_results: int = 8,
as_string: Literal[False] = False,
focus: list[str] | None = None,
self,
query: str,
max_results: int = 8,
as_string: Literal[False] = False,
focus: list[str] | None = None,
) -> list[dict[str, str]]:
...
async def run(
self,
query: str,
max_results: int = 8,
as_string: bool = True,
self,
query: str,
max_results: int = 8,
as_string: bool = True,
) -> str | list[dict]:
"""Return the results of a Google search using the official Google API

View file

@ -1,5 +1,8 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import asyncio
@ -11,7 +14,6 @@ from urllib.parse import urlparse
import httplib2
from pydantic import BaseModel, validator
from metagpt.config import CONFIG
from metagpt.logs import logger
try:
@ -27,6 +29,7 @@ except ImportError:
class GoogleAPIWrapper(BaseModel):
google_api_key: Optional[str] = None
google_cse_id: Optional[str] = None
global_proxy: Optional[str] = None
loop: Optional[asyncio.AbstractEventLoop] = None
executor: Optional[futures.Executor] = None
@ -36,7 +39,6 @@ class GoogleAPIWrapper(BaseModel):
@validator("google_api_key", always=True)
@classmethod
def check_google_api_key(cls, val: str):
val = val or CONFIG.google_api_key
if not val:
raise ValueError(
"To use, make sure you provide the google_api_key when constructing an object. Alternatively, "
@ -47,8 +49,7 @@ class GoogleAPIWrapper(BaseModel):
@validator("google_cse_id", always=True)
@classmethod
def check_google_cse_id(cls, val: str):
val = val or CONFIG.google_cse_id
def check_google_cse_id(cls, val):
if not val:
raise ValueError(
"To use, make sure you provide the google_cse_id when constructing an object. Alternatively, "
@ -60,8 +61,8 @@ class GoogleAPIWrapper(BaseModel):
@property
def google_api_client(self):
build_kwargs = {"developerKey": self.google_api_key}
if CONFIG.global_proxy:
parse_result = urlparse(CONFIG.global_proxy)
if self.global_proxy:
parse_result = urlparse(self.global_proxy)
proxy_type = parse_result.scheme
if proxy_type == "https":
proxy_type = "http"

View file

@ -4,13 +4,14 @@
@Time : 2023/5/23 18:27
@Author : alexanderwu
@File : search_engine_serpapi.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from typing import Any, Dict, Optional, Tuple
import aiohttp
from pydantic import BaseModel, Field, validator
from metagpt.config import CONFIG
from metagpt.config import Config
class SerpAPIWrapper(BaseModel):
@ -32,7 +33,6 @@ class SerpAPIWrapper(BaseModel):
@validator("serpapi_api_key", always=True)
@classmethod
def check_serpapi_api_key(cls, val: str):
val = val or CONFIG.serpapi_api_key
if not val:
raise ValueError(
"To use, make sure you provide the serpapi_api_key when constructing an object. Alternatively, "
@ -112,4 +112,4 @@ class SerpAPIWrapper(BaseModel):
if __name__ == "__main__":
import fire
fire.Fire(SerpAPIWrapper().run)
fire.Fire(SerpAPIWrapper(Config().runtime_options).run)

View file

@ -4,6 +4,7 @@
@Time : 2023/5/23 18:27
@Author : alexanderwu
@File : search_engine_serpapi.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import json
from typing import Any, Dict, Optional, Tuple
@ -11,8 +12,6 @@ from typing import Any, Dict, Optional, Tuple
import aiohttp
from pydantic import BaseModel, Field, validator
from metagpt.config import CONFIG
class SerperWrapper(BaseModel):
search_engine: Any #: :meta private:
@ -26,7 +25,6 @@ class SerperWrapper(BaseModel):
@validator("serper_api_key", always=True)
@classmethod
def check_serper_api_key(cls, val: str):
val = val or CONFIG.serper_api_key
if not val:
raise ValueError(
"To use, make sure you provide the serper_api_key when constructing an object. Alternatively, "

View file

@ -1,29 +1,33 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import importlib
from typing import Any, Callable, Coroutine, Literal, overload
from typing import Any, Callable, Coroutine, Literal, overload, Dict
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.tools import WebBrowserEngineType
from metagpt.utils.parse_html import WebPage
class WebBrowserEngine:
def __init__(
self,
engine: WebBrowserEngineType | None = None,
run_func: Callable[..., Coroutine[Any, Any, WebPage | list[WebPage]]] | None = None,
self,
options: Dict,
engine: WebBrowserEngineType | None = None,
run_func: Callable[..., Coroutine[Any, Any, WebPage | list[WebPage]]] | None = None,
):
engine = engine or CONFIG.web_browser_engine
engine = engine or options.get("web_browser_engine")
if engine == WebBrowserEngineType.PLAYWRIGHT:
module = "metagpt.tools.web_browser_engine_playwright"
run_func = importlib.import_module(module).PlaywrightWrapper().run
run_func = importlib.import_module(module).PlaywrightWrapper(options=options).run
elif engine == WebBrowserEngineType.SELENIUM:
module = "metagpt.tools.web_browser_engine_selenium"
run_func = importlib.import_module(module).SeleniumWrapper().run
run_func = importlib.import_module(module).SeleniumWrapper(options=options).run
elif engine == WebBrowserEngineType.CUSTOM:
run_func = run_func
else:
@ -47,6 +51,10 @@ if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, engine_type: Literal["playwright", "selenium"] = "playwright", **kwargs):
return await WebBrowserEngine(WebBrowserEngineType(engine_type), **kwargs).run(url, *urls)
conf = Config()
return await WebBrowserEngine(options=conf.runtime_options,
engine=WebBrowserEngineType(engine_type),
**kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -1,14 +1,18 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
from typing import Literal
from typing import Literal, Dict
from playwright.async_api import async_playwright
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.logs import logger
from metagpt.utils.parse_html import WebPage
@ -24,18 +28,20 @@ class PlaywrightWrapper:
def __init__(
self,
options: Dict,
browser_type: Literal["chromium", "firefox", "webkit"] | None = None,
launch_kwargs: dict | None = None,
**kwargs,
) -> None:
self.options = options
if browser_type is None:
browser_type = CONFIG.playwright_browser_type
browser_type = options.get("playwright_browser_type")
self.browser_type = browser_type
launch_kwargs = launch_kwargs or {}
if CONFIG.global_proxy and "proxy" not in launch_kwargs:
if options.get("global_proxy") and "proxy" not in launch_kwargs:
args = launch_kwargs.get("args", [])
if not any(str.startswith(i, "--proxy-server=") for i in args):
launch_kwargs["proxy"] = {"server": CONFIG.global_proxy}
launch_kwargs["proxy"] = {"server": options.get("global_proxy")}
self.launch_kwargs = launch_kwargs
context_kwargs = {}
if "ignore_https_errors" in kwargs:
@ -75,8 +81,8 @@ class PlaywrightWrapper:
executable_path = Path(browser_type.executable_path)
if not executable_path.exists() and "executable_path" not in self.launch_kwargs:
kwargs = {}
if CONFIG.global_proxy:
kwargs["env"] = {"ALL_PROXY": CONFIG.global_proxy}
if self.options.get("global_proxy"):
kwargs["env"] = {"ALL_PROXY": self.options.get("global_proxy")}
await _install_browsers(self.browser_type, **kwargs)
if self._has_run_precheck:
@ -144,6 +150,8 @@ if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, browser_type: str = "chromium", **kwargs):
return await PlaywrightWrapper(browser_type, **kwargs).run(url, *urls)
return await PlaywrightWrapper(options=Config().runtime_options,
browser_type=browser_type,
**kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -1,17 +1,21 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import asyncio
import importlib
from concurrent import futures
from copy import deepcopy
from typing import Literal
from typing import Literal, Dict
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.utils.parse_html import WebPage
@ -29,6 +33,7 @@ class SeleniumWrapper:
def __init__(
self,
options: Dict,
browser_type: Literal["chrome", "firefox", "edge", "ie"] | None = None,
launch_kwargs: dict | None = None,
*,
@ -36,11 +41,11 @@ class SeleniumWrapper:
executor: futures.Executor | None = None,
) -> None:
if browser_type is None:
browser_type = CONFIG.selenium_browser_type
browser_type = options.get("selenium_browser_type")
self.browser_type = browser_type
launch_kwargs = launch_kwargs or {}
if CONFIG.global_proxy and "proxy-server" not in launch_kwargs:
launch_kwargs["proxy-server"] = CONFIG.global_proxy
if options.get("global_proxy") and "proxy-server" not in launch_kwargs:
launch_kwargs["proxy-server"] = options.get("global_proxy")
self.executable_path = launch_kwargs.pop("executable_path", None)
self.launch_args = [f"--{k}={v}" for k, v in launch_kwargs.items()]
@ -118,6 +123,8 @@ if __name__ == "__main__":
import fire
async def main(url: str, *urls: str, browser_type: str = "chrome", **kwargs):
return await SeleniumWrapper(browser_type, **kwargs).run(url, *urls)
return await SeleniumWrapper(options=Config().runtime_options,
browser_type=browser_type,
**kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -4,14 +4,18 @@
@Time : 2023/4/29 16:07
@Author : alexanderwu
@File : common.py
@Modified By: mashenquan, 2023-8-17, add `initalize_enviroment()` to load `config/config.yaml` to `os.environ`
"""
import ast
import contextlib
import inspect
import os
import re
from pathlib import Path
from typing import List, Tuple
import yaml
from metagpt.logs import logger
@ -254,3 +258,4 @@ def parse_recipient(text):
pattern = r"## Send To:\s*([A-Za-z]+)\s*?" # hard code for now
recipient = re.search(pattern, text)
return recipient.group(1) if recipient else ""

View file

@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/28
@Author : mashenquan
@File : openai.py
@Desc : mashenquan, 2023/8/28. Separate the `CostManager` class to support user-level cost accounting.
"""
from pydantic import BaseModel
from metagpt.logs import logger
from metagpt.utils.token_counter import TOKEN_COSTS
from typing import NamedTuple
class Costs(NamedTuple):
total_prompt_tokens: int
total_completion_tokens: int
total_cost: float
total_budget: float
class CostManager(BaseModel):
"""Calculate the overhead of using the interface."""
total_prompt_tokens: int = 0
total_completion_tokens: int = 0
total_budget: float = 0
max_budget: float = 10.0
total_cost: float = 0
def update_cost(self, prompt_tokens, completion_tokens, model):
"""
Update the total cost, prompt tokens, and completion tokens.
Args:
prompt_tokens (int): The number of tokens used in the prompt.
completion_tokens (int): The number of tokens used in the completion.
model (str): The model used for the API call.
"""
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
cost = (prompt_tokens * TOKEN_COSTS[model]["prompt"] + completion_tokens * TOKEN_COSTS[model][
"completion"]) / 1000
self.total_cost += cost
logger.info(
f"Total running cost: ${self.total_cost:.3f} | Max budget: ${self.max_budget:.3f} | "
f"Current cost: ${cost:.3f}, prompt_tokens: {prompt_tokens}, completion_tokens: {completion_tokens}"
)
def get_total_prompt_tokens(self):
"""
Get the total number of prompt tokens.
Returns:
int: The total number of prompt tokens.
"""
return self.total_prompt_tokens
def get_total_completion_tokens(self):
"""
Get the total number of completion tokens.
Returns:
int: The total number of completion tokens.
"""
return self.total_completion_tokens
def get_total_cost(self):
"""
Get the total cost of API calls.
Returns:
float: The total cost of API calls.
"""
return self.total_cost
def get_costs(self) -> Costs:
"""获得所有开销"""
return Costs(self.total_prompt_tokens, self.total_completion_tokens, self.total_cost, self.total_budget)

View file

@ -4,19 +4,21 @@
@Time : 2023/7/4 10:53
@Author : alexanderwu
@File : mermaid.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import subprocess
from pathlib import Path
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.const import PROJECT_ROOT
from metagpt.logs import logger
from metagpt.utils.common import check_cmd_exists
def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height=2048) -> int:
def mermaid_to_file(options, mermaid_code, output_file_without_suffix, width=2048, height=2048) -> int:
"""suffix: png/svg/pdf
:param options: runtime context options, created by `Config` class object and changed in flow pipeline
:param mermaid_code: mermaid code
:param output_file_without_suffix: output filename
:param width:
@ -36,12 +38,12 @@ def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height
# Call the `mmdc` command to convert the Mermaid code to a PNG
logger.info(f"Generating {output_file}..")
if CONFIG.puppeteer_config:
if options.get("puppeteer_config"):
subprocess.run(
[
CONFIG.mmdc,
options.get("mmdc"),
"-p",
CONFIG.puppeteer_config,
options.get("puppeteer_config"),
"-i",
str(tmp),
"-o",
@ -53,7 +55,7 @@ def mermaid_to_file(mermaid_code, output_file_without_suffix, width=2048, height
]
)
else:
subprocess.run([CONFIG.mmdc, "-i", str(tmp), "-o", output_file, "-w", str(width), "-H", str(height)])
subprocess.run([options.get("mmdc"), "-i", str(tmp), "-o", output_file, "-w", str(width), "-H", str(height)])
return 0
@ -109,6 +111,8 @@ MMC2 = """sequenceDiagram
if __name__ == "__main__":
# logger.info(print_members(print_members))
mermaid_to_file(MMC1, PROJECT_ROOT / "tmp/1.png")
mermaid_to_file(MMC2, PROJECT_ROOT / "tmp/2.png")
conf = Config()
mermaid_to_file(options=conf.runtime_options, mermaid_code=MMC1,
output_file_without_suffix=PROJECT_ROOT / "tmp/1.png")
mermaid_to_file(options=conf.runtime_options, mermaid_code=MMC2,
output_file_without_suffix=PROJECT_ROOT / "tmp/2.png")

3
requirements-test.txt Normal file
View file

@ -0,0 +1,3 @@
-r requirements.txt
pytest
pytest-asyncio

View file

@ -35,5 +35,10 @@ tqdm==4.64.0
anthropic==0.3.6
typing-inspect==0.8.0
typing_extensions==4.5.0
aiofiles
libcst==1.0.1
qdrant-client==1.4.0
qdrant-client==1.4.0
connexion[swagger-ui]
aiohttp_jinja2
azure-cognitiveservices-speech==1.30.0
aiofile

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import platform
import fire
from metagpt.roles import Architect, Engineer, ProductManager, ProjectManager, QaEngineer
@ -33,6 +33,8 @@ def main(idea: str, investment: float = 3.0, n_round: int = 5, code_review: bool
:param code_review: Whether to use code review.
:return:
"""
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(startup(idea, investment, n_round, code_review, run_tests))

View file

@ -1,21 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/1 22:50
@Author : alexanderwu
@File : test_azure_tts.py
"""
from metagpt.actions.azure_tts import AzureTTS
def test_azure_tts():
azure_tts = AzureTTS("azure_tts")
azure_tts.synthesize_speech(
"zh-CN",
"zh-CN-YunxiNeural",
"Boy",
"你好,我是卡卡",
"output.wav")
# 运行需要先配置 SUBSCRIPTION_KEY
# TODO: 这里如果要检验还要额外加上对应的asr才能确保前后生成是接近一致的但现在还没有

View file

@ -4,7 +4,7 @@
#
from tests.metagpt.roles.ui_role import UIDesign
llm_resp= '''
llm_resp = '''
# UI Design Description
```The user interface for the snake game will be designed in a way that is simple, clean, and intuitive. The main elements of the game such as the game grid, snake, food, score, and game over message will be clearly defined and easy to understand. The game grid will be centered on the screen with the score displayed at the top. The game controls will be intuitive and easy to use. The design will be modern and minimalist with a pleasing color scheme.```
@ -100,6 +100,7 @@ body {
font-size: 3em;
'''
def test_ui_design_parse_css():
ui_design_work = UIDesign(name="UI design action")
@ -161,7 +162,7 @@ def test_ui_design_parse_css():
transform: translate(-50%, -50%);
font-size: 3em;
'''
assert ui_design_work.parse_css_code(context=llm_resp)==css
assert ui_design_work.parse_css_code(context=llm_resp) == css
def test_ui_design_parse_html():
@ -185,7 +186,4 @@ def test_ui_design_parse_html():
</body>
</html>
'''
assert ui_design_work.parse_css_code(context=llm_resp)==html
assert ui_design_work.parse_css_code(context=llm_resp) == html

View file

@ -4,11 +4,14 @@
@Time : 2023/5/11 17:45
@Author : alexanderwu
@File : test_write_code.py
@Modified By: mashenquan, 2023-8-1, fix-bug: `filename` of `write_code.run()` is missing.
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.config import Config
from metagpt.provider.openai_api import OpenAIGPTAPI as LLM, CostManager
from metagpt.actions.write_code import WriteCode
from metagpt.llm import LLM
from metagpt.logs import logger
from tests.metagpt.actions.mock import TASKS_2, WRITE_CODE_PROMPT_SAMPLE
@ -16,9 +19,11 @@ from tests.metagpt.actions.mock import TASKS_2, WRITE_CODE_PROMPT_SAMPLE
@pytest.mark.asyncio
async def test_write_code():
api_design = "设计一个名为'add'的函数,该函数接受两个整数作为输入,并返回它们的和。"
write_code = WriteCode("write_code")
code = await write_code.run(api_design)
conf = Config()
cost_manager = CostManager(**conf.runtime_options)
llm = LLM(options=conf.runtime_options, cost_manager=cost_manager)
write_code = WriteCode(options=conf.runtime_options, name="write_code", llm=llm)
code = await write_code.run(context=api_design, filename="test")
logger.info(code)
# 我们不能精确地预测生成的代码,但我们可以检查某些关键字
@ -29,6 +34,7 @@ async def test_write_code():
@pytest.mark.asyncio
async def test_write_code_directly():
prompt = WRITE_CODE_PROMPT_SAMPLE + '\n' + TASKS_2[0]
llm = LLM()
options = Config().runtime_options
llm = LLM(options=options, cost_manager=CostManager(**options))
rsp = await llm.aask(prompt)
logger.info(rsp)

View file

@ -0,0 +1,67 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/28 17:25
@Author : mashenquan
@File : test_write_teaching_plan.py
"""
import asyncio
from typing import Optional
from pydantic import BaseModel
from langchain.llms.base import LLM
from metagpt.actions.write_teaching_plan import WriteTeachingPlanPart
from metagpt.config import Config
from metagpt.schema import Message
class MockWriteTeachingPlanPart(WriteTeachingPlanPart):
def __init__(self, options, name: str = '', context=None, llm: LLM = None, topic="", language="Chinese"):
super().__init__(options, name, context, llm, topic, language)
async def _aask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
return f"{WriteTeachingPlanPart.DATA_BEGIN_TAG}\nprompt\n{WriteTeachingPlanPart.DATA_END_TAG}"
async def mock_write_teaching_plan_part():
class Inputs(BaseModel):
input: str
name: str
topic: str
language: str
inputs = [
{
"input": "AABBCC",
"name": "A",
"topic": WriteTeachingPlanPart.COURSE_TITLE,
"language": "C"
},
{
"input": "DDEEFFF",
"name": "A1",
"topic": "B1",
"language": "C1"
}
]
for i in inputs:
seed = Inputs(**i)
options = Config().runtime_options
act = MockWriteTeachingPlanPart(options=options, name=seed.name, topic=seed.topic, language=seed.language)
await act.run([Message(content="")])
assert act.topic == seed.topic
assert str(act) == seed.topic
assert act.name == seed.name
assert act.rsp == "# prompt" if seed.topic == WriteTeachingPlanPart.COURSE_TITLE else "prompt"
def test_suite():
loop = asyncio.get_event_loop()
task = loop.create_task(mock_write_teaching_plan_part())
loop.run_until_complete(task)
if __name__ == '__main__':
test_suite()

View file

View file

@ -0,0 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : test_text_to_embedding.py
@Desc : Unit tests.
"""
import asyncio
from pydantic import BaseModel
from metagpt.learn.text_to_embedding import text_to_embedding
from metagpt.tools.openai_text_to_embedding import ResultEmbedding
async def mock_text_to_embedding():
class Input(BaseModel):
input: str
inputs = [
{"input": "Panda emoji"}
]
for i in inputs:
seed = Input(**i)
data = await text_to_embedding(seed.input)
v = ResultEmbedding(**data)
assert len(v.data) > 0
def test_suite():
loop = asyncio.get_event_loop()
task = loop.create_task(mock_text_to_embedding())
loop.run_until_complete(task)
if __name__ == '__main__':
test_suite()

View file

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : test_text_to_image.py
@Desc : Unit tests.
"""
import asyncio
import base64
from pydantic import BaseModel
from metagpt.learn.text_to_image import text_to_image
async def mock_text_to_image():
class Input(BaseModel):
input: str
size_type: str
inputs = [
{"input": "Panda emoji", "size_type": "512x512"}
]
for i in inputs:
seed = Input(**i)
base64_data = await text_to_image(seed.input)
assert base64_data != ""
print(f"{seed.input} -> {base64_data}")
flags = ";base64,"
assert flags in base64_data
ix = base64_data.find(flags) + len(flags)
declaration = base64_data[0: ix]
assert declaration
data = base64_data[ix:]
assert data
assert base64.b64decode(data, validate=True)
def test_suite():
loop = asyncio.get_event_loop()
task = loop.create_task(mock_text_to_image())
loop.run_until_complete(task)
if __name__ == '__main__':
test_suite()

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/18
@Author : mashenquan
@File : test_text_to_speech.py
@Desc : Unit tests.
"""
import asyncio
import base64
from pydantic import BaseModel
from metagpt.learn.text_to_speech import text_to_speech
async def mock_text_to_speech():
class Input(BaseModel):
input: str
inputs = [
{"input": "Panda emoji"}
]
for i in inputs:
seed = Input(**i)
base64_data = await text_to_speech(seed.input)
assert base64_data != ""
print(f"{seed.input} -> {base64_data}")
flags = ";base64,"
assert flags in base64_data
ix = base64_data.find(flags) + len(flags)
declaration = base64_data[0: ix]
assert declaration
data = base64_data[ix:]
assert data
assert base64.b64decode(data, validate=True)
def test_suite():
loop = asyncio.get_event_loop()
task = loop.create_task(mock_text_to_speech())
loop.run_until_complete(task)
if __name__ == '__main__':
test_suite()

View file

@ -0,0 +1,57 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/27
@Author : mashenquan
@File : test_brain_memory.py
"""
import json
from typing import List
import pydantic
from metagpt.memory.brain_memory import BrainMemory
from metagpt.schema import Message
def test_json():
class Input(pydantic.BaseModel):
history: List[str]
solution: List[str]
knowledge: List[str]
stack: List[str]
inputs = [
{
"history": ["a", "b"],
"solution": ["c"],
"knowledge": ["d", "e"],
"stack": ["f"]
}
]
for i in inputs:
v = Input(**i)
bm = BrainMemory()
for h in v.history:
msg = Message(content=h)
bm.history.append(msg.dict())
for h in v.solution:
msg = Message(content=h)
bm.solution.append(msg.dict())
for h in v.knowledge:
msg = Message(content=h)
bm.knowledge.append(msg.dict())
for h in v.stack:
msg = Message(content=h)
bm.stack.append(msg.dict())
s = bm.json()
m = json.loads(s)
bm = BrainMemory(**m)
assert bm
for v in bm.history:
msg = Message(**v)
assert msg
if __name__ == '__main__':
test_json()

View file

@ -1,8 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of `metagpt/memory/longterm_memory.py`
from metagpt.config import CONFIG
"""
@Desc : unittest of `metagpt/memory/longterm_memory.py`
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from metagpt.config import Config
from metagpt.schema import Message
from metagpt.actions import BossRequirement
from metagpt.roles.role import RoleContext
@ -10,12 +12,13 @@ from metagpt.memory import LongTermMemory
def test_ltm_search():
assert hasattr(CONFIG, "long_term_memory") is True
openai_api_key = CONFIG.openai_api_key
conf = Config()
assert hasattr(conf, "long_term_memory") is True
openai_api_key = conf.openai_api_key
assert len(openai_api_key) > 20
role_id = 'UTUserLtm(Product Manager)'
rc = RoleContext(watch=[BossRequirement])
rc = RoleContext(options=conf.runtime_options, watch=[BossRequirement])
ltm = LongTermMemory()
ltm.recover_memory(role_id, rc)
@ -23,19 +26,19 @@ def test_ltm_search():
message = Message(role='BOSS', content=idea, cause_by=BossRequirement)
news = ltm.remember([message])
assert len(news) == 1
ltm.add(message)
ltm.add(message, **conf.runtime_options)
sim_idea = 'Write a game of cli snake'
sim_message = Message(role='BOSS', content=sim_idea, cause_by=BossRequirement)
news = ltm.remember([sim_message])
assert len(news) == 0
ltm.add(sim_message)
ltm.add(sim_message, **conf.runtime_options)
new_idea = 'Write a 2048 web game'
new_message = Message(role='BOSS', content=new_idea, cause_by=BossRequirement)
news = ltm.remember([new_message])
assert len(news) == 1
ltm.add(new_message)
ltm.add(new_message, **conf.runtime_options)
# restore from local index
ltm_new = LongTermMemory()

View file

@ -0,0 +1,101 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/27 13:25
@Author : mashenquan
@File : test_teacher.py
"""
from typing import Dict, Optional
from pydantic import BaseModel
from metagpt.config import Config
from metagpt.provider.openai_api import CostManager
from metagpt.roles.teacher import Teacher
def test_init():
class Inputs(BaseModel):
name: str
profile: str
goal: str
constraints: str
desc: str
kwargs: Optional[Dict] = None
expect_name: str
expect_profile: str
expect_goal: str
expect_constraints: str
expect_desc: str
inputs = [
{
"name": "Lily{language}",
"expect_name": "LilyCN",
"profile": "X {teaching_language}",
"expect_profile": "X EN",
"goal": "Do {something_big}, {language}",
"expect_goal": "Do sleep, CN",
"constraints": "Do in {key1}, {language}",
"expect_constraints": "Do in HaHa, CN",
"kwargs": {"language": "CN", "key1": "HaHa", "something_big": "sleep", "teaching_language": "EN"},
"desc": "aaa{language}",
"expect_desc": "aaaCN"
},
{
"name": "Lily{language}",
"expect_name": "Lily{language}",
"profile": "X {teaching_language}",
"expect_profile": "X {teaching_language}",
"goal": "Do {something_big}, {language}",
"expect_goal": "Do {something_big}, {language}",
"constraints": "Do in {key1}, {language}",
"expect_constraints": "Do in {key1}, {language}",
"kwargs": {},
"desc": "aaa{language}",
"expect_desc": "aaa{language}"
},
]
for i in inputs:
seed = Inputs(**i)
options = Config().runtime_options
cost_manager = CostManager(**options)
teacher = Teacher(options=options, cost_manager=cost_manager, name=seed.name, profile=seed.profile,
goal=seed.goal, constraints=seed.constraints,
desc=seed.desc, **seed.kwargs)
assert teacher.name == seed.expect_name
assert teacher.desc == seed.expect_desc
assert teacher.profile == seed.expect_profile
assert teacher.goal == seed.expect_goal
assert teacher.constraints == seed.expect_constraints
assert teacher.course_title == "teaching_plan"
def test_new_file_name():
class Inputs(BaseModel):
lesson_title: str
ext: str
expect: str
inputs = [
{
"lesson_title": "# @344\n12",
"ext": ".md",
"expect": "_344_12.md"
},
{
"lesson_title": "1#@$%!*&\\/:*?\"<>|\n\t \'1",
"ext": ".cc",
"expect": "1_1.cc"
}
]
for i in inputs:
seed = Inputs(**i)
result = Teacher.new_file_name(seed.lesson_title, seed.ext)
assert result == seed.expect
if __name__ == '__main__':
test_init()
test_new_file_name()

View file

@ -4,14 +4,17 @@
@Time : 2023/5/12 00:47
@Author : alexanderwu
@File : test_environment.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.actions import BossRequirement
from metagpt.config import Config
from metagpt.environment import Environment
from metagpt.logs import logger
from metagpt.manager import Manager
from metagpt.provider.openai_api import CostManager
from metagpt.roles import Architect, ProductManager, Role
from metagpt.schema import Message
@ -22,33 +25,45 @@ def env():
def test_add_role(env: Environment):
role = ProductManager("Alice", "product manager", "create a new product", "limited resources")
conf = Config()
cost_manager = CostManager(**conf.runtime_options)
role = ProductManager(options=conf.runtime_options,
cost_manager=cost_manager,
name="Alice",
profile="product manager",
goal="create a new product",
constraints="limited resources")
env.add_role(role)
assert env.get_role(role.profile) == role
def test_get_roles(env: Environment):
role1 = Role("Alice", "product manager", "create a new product", "limited resources")
role2 = Role("Bob", "engineer", "develop the new product", "short deadline")
conf = Config()
cost_manager = CostManager(**conf.runtime_options)
role1 = Role(options=conf.runtime_options, cost_manager=cost_manager, name="Alice", profile="product manager",
goal="create a new product", constraints="limited resources")
role2 = Role(options=conf.runtime_options, cost_manager=cost_manager, name="Bob", profile="engineer",
goal="develop the new product", constraints="short deadline")
env.add_role(role1)
env.add_role(role2)
roles = env.get_roles()
assert roles == {role1.profile: role1, role2.profile: role2}
def test_set_manager(env: Environment):
manager = Manager()
env.set_manager(manager)
assert env.manager == manager
@pytest.mark.asyncio
async def test_publish_and_process_message(env: Environment):
product_manager = ProductManager("Alice", "Product Manager", "做AI Native产品", "资源有限")
architect = Architect("Bob", "Architect", "设计一个可用、高效、较低成本的系统,包括数据结构与接口", "资源有限,需要节省成本")
conf = Config()
cost_manager = CostManager(**conf.runtime_options)
product_manager = ProductManager(options=conf.runtime_options,
cost_manager=cost_manager,
name="Alice", profile="Product Manager",
goal="做AI Native产品", constraints="资源有限")
architect = Architect(options=conf.runtime_options,
cost_manager=cost_manager,
name="Bob", profile="Architect", goal="设计一个可用、高效、较低成本的系统,包括数据结构与接口",
constraints="资源有限,需要节省成本")
env.add_roles([product_manager, architect])
env.set_manager(Manager())
env.publish_message(Message(role="BOSS", content="需要一个基于LLM做总结的搜索引擎", cause_by=BossRequirement))
await env.run(k=2)

View file

@ -4,16 +4,19 @@
@Time : 2023/5/11 14:45
@Author : alexanderwu
@File : test_llm.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.llm import LLM
from metagpt.config import Config
from metagpt.provider.openai_api import OpenAIGPTAPI as LLM, CostManager
@pytest.fixture()
def llm():
return LLM()
options = Config().runtime_options
return LLM(options=options, cost_manager=CostManager(**options))
@pytest.mark.asyncio

View file

@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/1 22:50
@Author : alexanderwu
@File : test_azure_tts.py
@Modified By: mashenquan, 2023-8-9, add more text formatting options
@Modified By: mashenquan, 2023-8-17, move to `tools` folder.
"""
import asyncio
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent.parent.parent.parent)) # fix-bug: No module named 'metagpt'
from metagpt.const import WORKSPACE_ROOT
from metagpt.tools.azure_tts import AzureTTS
def test_azure_tts():
azure_tts = AzureTTS(subscription_key="", region="")
text = """
女儿看见父亲走了进来问道
<mstts:express-as role="YoungAdultFemale" style="calm">
您来的挺快的怎么过来的
</mstts:express-as>
父亲放下手提包
<mstts:express-as role="OlderAdultMale" style="calm">
Writing a binary file in Python is similar to writing a regular text file, but you'll work with bytes instead of strings.”
</mstts:express-as>
"""
path = WORKSPACE_ROOT / "tts"
path.mkdir(exist_ok=True, parents=True)
filename = path / "girl.wav"
loop = asyncio.new_event_loop()
v = loop.create_task(azure_tts.synthesize_speech(
lang="zh-CN",
voice="zh-CN-XiaomoNeural",
text=text,
output_file=str(filename)))
result = loop.run_until_complete(v)
print(result)
# 运行需要先配置 SUBSCRIPTION_KEY
# TODO: 这里如果要检验还要额外加上对应的asr才能确保前后生成是接近一致的但现在还没有
if __name__ == '__main__':
test_azure_tts()

View file

@ -4,11 +4,13 @@
@Time : 2023/5/2 17:46
@Author : alexanderwu
@File : test_search_engine.py
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from __future__ import annotations
import pytest
from metagpt.config import Config
from metagpt.logs import logger
from metagpt.tools import SearchEngineType
from metagpt.tools.search_engine import SearchEngine
@ -37,9 +39,10 @@ class MockSearchEnine:
],
)
async def test_search_engine(search_engine_typpe, run_func, max_results, as_string, ):
search_engine = SearchEngine(search_engine_typpe, run_func)
rsp = await search_engine.run("metagpt", max_results=max_results, as_string=as_string)
async def test_search_engine(search_engine_typpe, run_func, max_results, as_string):
conf = Config()
search_engine = SearchEngine(options=conf.runtime_options, engine=search_engine_typpe, run_func=run_func)
rsp = await search_engine.run(query="metagpt", max_results=max_results, as_string=as_string)
logger.info(rsp)
if as_string:
assert isinstance(rsp, str)

View file

@ -1,5 +1,10 @@
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.config import Config
from metagpt.tools import WebBrowserEngineType, web_browser_engine
@ -13,7 +18,8 @@ from metagpt.tools import WebBrowserEngineType, web_browser_engine
ids=["playwright", "selenium"],
)
async def test_scrape_web_page(browser_type, url, urls):
browser = web_browser_engine.WebBrowserEngine(browser_type)
conf = Config()
browser = web_browser_engine.WebBrowserEngine(options=conf.runtime_options, engine=browser_type)
result = await browser.run(url)
assert isinstance(result, str)
assert "深度赋智" in result

View file

@ -1,6 +1,10 @@
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.tools import web_browser_engine_playwright
@ -15,22 +19,24 @@ from metagpt.tools import web_browser_engine_playwright
ids=["chromium-normal", "firefox-normal", "webkit-normal"],
)
async def test_scrape_web_page(browser_type, use_proxy, kwagrs, url, urls, proxy, capfd):
conf = Config()
global_proxy = conf.global_proxy
try:
global_proxy = CONFIG.global_proxy
if use_proxy:
CONFIG.global_proxy = proxy
browser = web_browser_engine_playwright.PlaywrightWrapper(browser_type, **kwagrs)
conf.global_proxy = proxy
browser = web_browser_engine_playwright.PlaywrightWrapper(options=conf.runtime_options,
browser_type=browser_type, **kwagrs)
result = await browser.run(url)
result = result.inner_text
assert isinstance(result, str)
assert "Deepwisdom" in result
assert "DeepWisdom" in result
if urls:
results = await browser.run(url, *urls)
assert isinstance(results, list)
assert len(results) == len(urls) + 1
assert all(("Deepwisdom" in i) for i in results)
assert all(("DeepWisdom" in i) for i in results)
if use_proxy:
assert "Proxy:" in capfd.readouterr().out
finally:
CONFIG.global_proxy = global_proxy
conf.global_proxy = global_proxy

View file

@ -1,6 +1,10 @@
"""
@Modified By: mashenquan, 2023/8/20. Remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
import pytest
from metagpt.config import CONFIG
from metagpt.config import Config
from metagpt.tools import web_browser_engine_selenium
@ -15,11 +19,12 @@ from metagpt.tools import web_browser_engine_selenium
ids=["chrome-normal", "firefox-normal", "edge-normal"],
)
async def test_scrape_web_page(browser_type, use_proxy, url, urls, proxy, capfd):
conf = Config()
global_proxy = conf.global_proxy
try:
global_proxy = CONFIG.global_proxy
if use_proxy:
CONFIG.global_proxy = proxy
browser = web_browser_engine_selenium.SeleniumWrapper(browser_type)
conf.global_proxy = proxy
browser = web_browser_engine_selenium.SeleniumWrapper(options=conf.runtime_options, browser_type=browser_type)
result = await browser.run(url)
result = result.inner_text
assert isinstance(result, str)
@ -33,4 +38,4 @@ async def test_scrape_web_page(browser_type, use_proxy, url, urls, proxy, capfd)
if use_proxy:
assert "Proxy:" in capfd.readouterr().out
finally:
CONFIG.global_proxy = global_proxy
conf.global_proxy = global_proxy

View file

@ -4,19 +4,15 @@
@Time : 2023/5/1 11:19
@Author : alexanderwu
@File : test_config.py
@Modified By: mashenquan, 2013/8/20, Add `test_options`; remove global configuration `CONFIG`, enable configuration support for business isolation.
"""
from pathlib import Path
import pytest
from metagpt.config import Config
def test_config_class_is_singleton():
config_1 = Config()
config_2 = Config()
assert config_1 == config_2
def test_config_class_get_key_exception():
with pytest.raises(Exception) as exc_info:
config = Config()
@ -25,7 +21,17 @@ def test_config_class_get_key_exception():
def test_config_yaml_file_not_exists():
config = Config('wtf.yaml')
with pytest.raises(Exception) as exc_info:
config.get('OPENAI_BASE_URL')
assert str(exc_info.value) == "Key 'OPENAI_BASE_URL' not found in environment variables or in the YAML file"
Config(Path('wtf.yaml'))
assert str(exc_info.value) == "Set OPENAI_API_KEY or Anthropic_API_KEY first"
def test_options():
filename = Path(__file__).resolve().parent.parent.parent.parent / "config/config.yaml"
config = Config(filename)
opts = config.runtime_options
assert opts
if __name__ == '__main__':
test_options()