Merge pull request #628 from iorisa/fixbug/role/assistant

fixbug: 修复通用智能体role及其相关的TalkAction和SkillAction
This commit is contained in:
geekan 2023-12-25 23:14:21 +08:00 committed by GitHub
commit 59586f30d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 541 additions and 512 deletions

View file

@ -14,36 +14,45 @@ import traceback
from copy import deepcopy
from typing import Dict, Optional
from metagpt.actions import Action, ActionOutput
from metagpt.actions import Action
from metagpt.learn.skill_loader import Skill
from metagpt.logs import logger
from metagpt.schema import Message
# TOTEST
class ArgumentsParingAction(Action):
skill: Skill
ask: str
rsp: Optional[ActionOutput]
args: Optional[Dict]
rsp: Optional[Message] = None
args: Optional[Dict] = None
@property
def prompt(self):
prompt = f"{self.skill.name} function parameters description:\n"
prompt = "You are a function parser. You can convert spoken words into function parameters.\n"
prompt += "\n---\n"
prompt += f"{self.skill.name} function parameters description:\n"
for k, v in self.skill.arguments.items():
prompt += f"parameter `{k}`: {v}\n"
prompt += "\n"
prompt += "\n---\n"
prompt += "Examples:\n"
for e in self.skill.examples:
prompt += f"If want you to do `{e.ask}`, return `{e.answer}` brief and clear.\n"
prompt += f"\nNow I want you to do `{self.ask}`, return in examples format above, brief and clear."
prompt += "\n---\n"
prompt += (
f"\nRefer to the `{self.skill.name}` function description, and fill in the function parameters according "
'to the example "I want you to do xx" in the Examples section.'
f"\nNow I want you to do `{self.ask}`, return function parameters in Examples format above, brief and "
"clear."
)
return prompt
async def run(self, *args, **kwargs) -> ActionOutput:
async def run(self, with_message=None, **kwargs) -> Message:
prompt = self.prompt
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
logger.debug(f"SKILL:{prompt}\n, RESULT:{rsp}")
self.args = ArgumentsParingAction.parse_arguments(skill_name=self.skill.name, txt=rsp)
self.rsp = ActionOutput(content=rsp)
self.rsp = Message(content=rsp, role="assistant", instruct_content=self.args, cause_by=self)
return self.rsp
@staticmethod
@ -72,9 +81,9 @@ class ArgumentsParingAction(Action):
class SkillAction(Action):
skill: Skill
args: Dict
rsp: str = ""
rsp: Optional[Message] = None
async def run(self, *args, **kwargs) -> str | ActionOutput | None:
async def run(self, with_message=None, **kwargs) -> Message:
"""Run action"""
options = deepcopy(kwargs)
if self.args:
@ -82,26 +91,21 @@ class SkillAction(Action):
if k in options:
options.pop(k)
try:
self.rsp = await self.find_and_call_function(self.skill.name, args=self.args, **options)
rsp = await self.find_and_call_function(self.skill.name, args=self.args, **options)
self.rsp = Message(content=rsp, role="assistant", cause_by=self)
except Exception as e:
logger.exception(f"{e}, traceback:{traceback.format_exc()}")
self.rsp = f"Error: {e}"
return ActionOutput(content=self.rsp, instruct_content=self.skill.json())
self.rsp = Message(content=f"Error: {e}", role="assistant", cause_by=self)
return self.rsp
@staticmethod
async def find_and_call_function(function_name, args, **kwargs):
async def find_and_call_function(function_name, args, **kwargs) -> str:
try:
module = importlib.import_module("metagpt.learn")
function = getattr(module, function_name)
# 调用函数并返回结果
# Invoke function and return result
result = await function(**args, **kwargs)
return result
except (ModuleNotFoundError, AttributeError):
logger.error(f"{function_name} not found")
return None
if __name__ == "__main__":
ArgumentsParingAction.parse_arguments(
skill_name="text_to_image", txt='`text_to_image(text="Draw an apple", size_type="512x512")`'
)
raise ValueError(f"{function_name} not found")

View file

@ -6,27 +6,21 @@
@File : talk_action.py
@Desc : Act as its a talk
"""
import json
from typing import Optional
from metagpt.actions import Action, ActionOutput
from metagpt.actions import Action
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE
from metagpt.llm import LLMType
from metagpt.logs import logger
from metagpt.schema import Message
# TOTEST
class TalkAction(Action):
def __init__(self, name: str = "", talk="", history_summary="", knowledge="", context=None, llm=None, **kwargs):
context = context or {}
context["talk"] = talk
context["history_summery"] = history_summary
context["knowledge"] = knowledge
super(TalkAction, self).__init__(name=name, context=context, llm=llm)
self._talk = talk
self._history_summary = history_summary
self._knowledge = knowledge
self._rsp = None
context: str
history_summary: str = ""
knowledge: str = ""
rsp: Optional[Message] = None
@property
def prompt(self):
@ -37,15 +31,15 @@ class TalkAction(Action):
f"maintaining the character's persona and habits. When faced with unrelated questions, playfully "
f"decline to answer without revealing your AI nature to preserve the character's image.\n\n"
)
prompt += f"Knowledge:\n{self._knowledge}\n\n" if self._knowledge else ""
prompt += f"{self._history_summary}\n\n"
prompt += f"Knowledge:\n{self.knowledge}\n\n" if self.knowledge else ""
prompt += f"{self.history_summary}\n\n"
prompt += (
"If the information is insufficient, you can search in the historical conversation or knowledge above.\n"
)
language = CONFIG.language or DEFAULT_LANGUAGE
prompt += (
f"Answer the following questions strictly in {language}, and the answers must follow the Markdown format.\n "
f"{self._talk}"
f"{self.context}"
)
logger.debug(f"PROMPT: {prompt}")
return prompt
@ -54,23 +48,23 @@ class TalkAction(Action):
def prompt_gpt4(self):
kvs = {
"{role}": CONFIG.agent_description or "",
"{history}": self._history_summary or "",
"{knowledge}": self._knowledge or "",
"{history}": self.history_summary or "",
"{knowledge}": self.knowledge or "",
"{language}": CONFIG.language or DEFAULT_LANGUAGE,
"{ask}": self._talk,
"{ask}": self.context,
}
prompt = TalkAction.__FORMATION_LOOSE__
prompt = TalkActionPrompt.FORMATION_LOOSE
for k, v in kvs.items():
prompt = prompt.replace(k, v)
logger.info(f"PROMPT: {prompt}")
return prompt
async def run_old(self, *args, **kwargs) -> ActionOutput:
prompt = self.prompt
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
logger.debug(f"PROMPT:{prompt}\nRESULT:{rsp}\n")
self._rsp = ActionOutput(content=rsp)
return self._rsp
# async def run_old(self, *args, **kwargs) -> ActionOutput:
# prompt = self.prompt
# rsp = await self.llm.aask(msg=prompt, system_msgs=[])
# logger.debug(f"PROMPT:{prompt}\nRESULT:{rsp}\n")
# self._rsp = ActionOutput(content=rsp)
# return self._rsp
@property
def aask_args(self):
@ -84,22 +78,21 @@ class TalkAction(Action):
f"Answer the following questions strictly in {language}, and the answers must follow the Markdown format.",
]
format_msgs = []
if self._knowledge:
format_msgs.append({"role": "assistant", "content": self._knowledge})
if self._history_summary:
if CONFIG.LLM_TYPE == LLMType.METAGPT.value:
format_msgs.extend(json.loads(self._history_summary))
else:
format_msgs.append({"role": "assistant", "content": self._history_summary})
return self._talk, format_msgs, system_msgs
if self.knowledge:
format_msgs.append({"role": "assistant", "content": self.knowledge})
if self.history_summary:
format_msgs.append({"role": "assistant", "content": self.history_summary})
return self.context, format_msgs, system_msgs
async def run(self, *args, **kwargs) -> ActionOutput:
async def run(self, with_message=None, **kwargs) -> Message:
msg, format_msgs, system_msgs = self.aask_args
rsp = await self.llm.aask(msg=msg, format_msgs=format_msgs, system_msgs=system_msgs)
self._rsp = ActionOutput(content=rsp)
return self._rsp
self.rsp = Message(content=rsp, role="assistant", cause_by=self)
return self.rsp
__FORMATION__ = """Formation: "Capacity and role" defines the role you are currently playing;
class TalkActionPrompt:
FORMATION = """Formation: "Capacity and role" defines the role you are currently playing;
"[HISTORY_BEGIN]" and "[HISTORY_END]" tags enclose the historical conversation;
"[KNOWLEDGE_BEGIN]" and "[KNOWLEDGE_END]" tags enclose the knowledge may help for your responses;
"Statement" defines the work detail you need to complete at this stage;
@ -135,7 +128,7 @@ Statement: Unless you are a language professional, answer the following question
{ask}
"""
__FORMATION_LOOSE__ = """Formation: "Capacity and role" defines the role you are currently playing;
FORMATION_LOOSE = """Formation: "Capacity and role" defines the role you are currently playing;
"[HISTORY_BEGIN]" and "[HISTORY_END]" tags enclose the historical conversation;
"[KNOWLEDGE_BEGIN]" and "[KNOWLEDGE_END]" tags enclose the knowledge may help for your responses;
"Statement" defines the work detail you need to complete at this stage;

View file

@ -9,6 +9,7 @@
from pathlib import Path
from typing import Dict, List, Optional
import aiofiles
import yaml
from pydantic import BaseModel, Field
@ -63,61 +64,37 @@ class SkillsDeclaration(BaseModel):
entities: Dict[str, Entity]
components: Components = None
class SkillLoader:
def __init__(self, skill_yaml_file_name: Path = None):
@staticmethod
async def load(skill_yaml_file_name: Path = None) -> "SkillsDeclaration":
if not skill_yaml_file_name:
skill_yaml_file_name = Path(__file__).parent.parent.parent / ".well-known/skills.yaml"
with open(str(skill_yaml_file_name), "r") as file:
skills = yaml.safe_load(file)
self._skills = SkillsDeclaration(**skills)
async with aiofiles.open(str(skill_yaml_file_name), mode="r") as reader:
data = await reader.read(-1)
skill_data = yaml.safe_load(data)
return SkillsDeclaration(**skill_data)
def get_skill_list(self, entity_name: str = "Assistant") -> Dict:
"""Return the skill name based on the skill description."""
entity = self.get_entity(entity_name)
entity = self.entities.get(entity_name)
if not entity:
return {}
# List of skills that the agent chooses to activate.
agent_skills = CONFIG.agent_skills
if not agent_skills:
return {}
class AgentSkill(BaseModel):
class _AgentSkill(BaseModel):
name: str
names = [AgentSkill(**i).name for i in agent_skills]
description_to_name_mappings = {}
for s in entity.skills:
if s.name not in names:
continue
description_to_name_mappings[s.description] = s.name
return description_to_name_mappings
names = [_AgentSkill(**i).name for i in agent_skills]
return {s.description: s.name for s in entity.skills if s.name in names}
def get_skill(self, name, entity_name: str = "Assistant") -> Skill:
"""Return a skill by name."""
entity = self.get_entity(entity_name)
entity = self.entities.get(entity_name)
if not entity:
return None
for sk in entity.skills:
if sk.name == name:
return sk
def get_entity(self, name) -> Entity:
"""Return a list of skills for the entity."""
if not self._skills:
return None
return self._skills.entities.get(name)
if __name__ == "__main__":
CONFIG.agent_skills = [
{"id": 1, "name": "text_to_speech", "type": "builtin", "config": {}, "enabled": True},
{"id": 2, "name": "text_to_image", "type": "builtin", "config": {}, "enabled": True},
{"id": 3, "name": "ai_call", "type": "builtin", "config": {}, "enabled": True},
{"id": 3, "name": "data_analysis", "type": "builtin", "config": {}, "enabled": True},
{"id": 5, "name": "crawler", "type": "builtin", "config": {"engine": "ddg"}, "enabled": True},
{"id": 6, "name": "knowledge", "type": "builtin", "config": {}, "enabled": True},
]
loader = SkillLoader()
print(loader.get_skill_list())

View file

@ -27,7 +27,7 @@ async def text_to_image(text, size_type: str = "512x512", openai_api_key="", mod
if CONFIG.METAGPT_TEXT_TO_IMAGE_MODEL_URL or model_url:
base64_data = await oas3_metagpt_text_to_image(text, size_type, model_url)
elif CONFIG.OPENAI_API_KEY or openai_api_key:
base64_data = await oas3_openai_text_to_image(text, size_type, openai_api_key)
base64_data = await oas3_openai_text_to_image(text, size_type)
else:
raise ValueError("Missing necessary parameters.")

View file

@ -4,343 +4,250 @@
@Time : 2023/8/18
@Author : mashenquan
@File : brain_memory.py
@Desc : Support memory for multiple tasks and multiple mainlines. Obsoleted by `utils/*_repository.py`.
@Desc : Used by AgentStore. Used for long-term storage and automatic compression.
@Modified By: mashenquan, 2023/9/4. + redis memory cache.
@Modified By: mashenquan, 2023/12/25. Simplify Functionality.
"""
# import json
# import re
# from enum import Enum
# from typing import Dict, List, Optional
#
# import openai
# import pydantic
#
# from metagpt.config import CONFIG
# from metagpt.const import DEFAULT_LANGUAGE, DEFAULT_MAX_TOKENS
# from metagpt.logs import logger
# from metagpt.schema import Message, RawMessage
# from metagpt.utils.redis import Redis
#
#
# class MessageType(Enum):
# Talk = "TALK"
# Solution = "SOLUTION"
# Problem = "PROBLEM"
# Skill = "SKILL"
# Answer = "ANSWER"
#
#
# class BrainMemory(pydantic.BaseModel):
# history: List[Dict] = []
# stack: List[Dict] = []
# solution: List[Dict] = []
# knowledge: List[Dict] = []
# historical_summary: str = ""
# last_history_id: str = ""
# is_dirty: bool = False
# last_talk: str = None
# llm_type: Optional[str] = None
# cacheable: bool = True
#
# def add_talk(self, msg: Message):
# msg.role = "user"
# self.add_history(msg)
# self.is_dirty = True
#
# def add_answer(self, msg: Message):
# msg.role = "assistant"
# self.add_history(msg)
# self.is_dirty = True
#
# def get_knowledge(self) -> str:
# texts = [Message(**m).content for m in self.knowledge]
# return "\n".join(texts)
#
# @staticmethod
# async def loads(redis_key: str, redis_conf: Dict = None) -> "BrainMemory":
# redis = Redis(conf=redis_conf)
# if not redis.is_valid() or not redis_key:
# return BrainMemory(llm_type=CONFIG.LLM_TYPE)
# v = await redis.get(key=redis_key)
# logger.debug(f"REDIS GET {redis_key} {v}")
# if v:
# data = json.loads(v)
# bm = BrainMemory(**data)
# bm.is_dirty = False
# return bm
# return BrainMemory(llm_type=CONFIG.LLM_TYPE)
#
# async def dumps(self, redis_key: str, timeout_sec: int = 30 * 60, redis_conf: Dict = None):
# if not self.is_dirty:
# return
# redis = Redis(conf=redis_conf)
# if not redis.is_valid() or not redis_key:
# return False
# v = self.json()
# if self.cacheable:
# await redis.set(key=redis_key, data=v, timeout_sec=timeout_sec)
# logger.debug(f"REDIS SET {redis_key} {v}")
# self.is_dirty = False
#
# @staticmethod
# def to_redis_key(prefix: str, user_id: str, chat_id: str):
# return f"{prefix}:{user_id}:{chat_id}"
#
# async def set_history_summary(self, history_summary, redis_key, redis_conf):
# if self.historical_summary == history_summary:
# if self.is_dirty:
# await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
# self.is_dirty = False
# return
#
# self.historical_summary = history_summary
# self.history = []
# await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
# self.is_dirty = False
#
# def add_history(self, msg: Message):
# if msg.id:
# if self.to_int(msg.id, 0) <= self.to_int(self.last_history_id, -1):
# return
# self.history.append(msg.dict())
# self.last_history_id = str(msg.id)
# self.is_dirty = True
#
# def exists(self, text) -> bool:
# for m in reversed(self.history):
# if m.get("content") == text:
# return True
# return False
#
# @staticmethod
# def to_int(v, default_value):
# try:
# return int(v)
# except:
# return default_value
#
# def pop_last_talk(self):
# v = self.last_talk
# self.last_talk = None
# return v
#
# async def summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs):
# if self.llm_type == LLMType.METAGPT.value:
# return await self._metagpt_summarize(llm=llm, max_words=max_words, keep_language=keep_language, **kwargs)
#
# return await self._openai_summarize(
# llm=llm, max_words=max_words, keep_language=keep_language, limit=limit, **kwargs
# )
#
# async def _openai_summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs):
# max_token_count = DEFAULT_MAX_TOKENS
# max_count = 100
# texts = [self.historical_summary]
# for i in self.history:
# m = Message(**i)
# texts.append(m.content)
# text = "\n".join(texts)
# text_length = len(text)
# if limit > 0 and text_length < limit:
# return text
# summary = ""
# while max_count > 0:
# if text_length < max_token_count:
# summary = await self._get_summary(text=text, llm=llm, max_words=max_words, keep_language=keep_language)
# break
#
# padding_size = 20 if max_token_count > 20 else 0
# text_windows = self.split_texts(text, window_size=max_token_count - padding_size)
# part_max_words = min(int(max_words / len(text_windows)) + 1, 100)
# summaries = []
# for ws in text_windows:
# response = await self._get_summary(
# text=ws, llm=llm, max_words=part_max_words, keep_language=keep_language
# )
# summaries.append(response)
# if len(summaries) == 1:
# summary = summaries[0]
# break
#
# # Merged and retry
# text = "\n".join(summaries)
# text_length = len(text)
#
# max_count -= 1 # safeguard
# if summary:
# await self.set_history_summary(history_summary=summary, redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS)
# return summary
# raise openai.InvalidRequestError(message="text too long", param=None)
#
# async def _metagpt_summarize(self, max_words=200, **kwargs):
# if not self.history:
# return ""
#
# total_length = 0
# msgs = []
# for i in reversed(self.history):
# m = Message(**i)
# delta = len(m.content)
# if total_length + delta > max_words:
# left = max_words - total_length
# if left == 0:
# break
# m.content = m.content[0:left]
# msgs.append(m.dict())
# break
# msgs.append(i)
# total_length += delta
# msgs.reverse()
# self.history = msgs
# self.is_dirty = True
# await self.dumps(redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS_CONF)
# self.is_dirty = False
#
# return BrainMemory.to_metagpt_history_format(self.history)
#
# @staticmethod
# def to_metagpt_history_format(history) -> str:
# mmsg = []
# for m in history:
# msg = Message(**m)
# r = RawMessage(role="user" if MessageType.Talk.value in msg.tags else "assistant", content=msg.content)
# mmsg.append(r)
# return json.dumps(mmsg)
#
# @staticmethod
# async def _get_summary(text: str, llm, max_words=20, keep_language: bool = False):
# """Generate text summary"""
# if len(text) < max_words:
# return text
# if keep_language:
# command = f".Translate the above content into a summary of less than {max_words} words in language of the content strictly."
# else:
# command = f"Translate the above content into a summary of less than {max_words} words."
# msg = text + "\n\n" + command
# logger.debug(f"summary ask:{msg}")
# response = await llm.aask(msg=msg, system_msgs=[])
# logger.debug(f"summary rsp: {response}")
# return response
#
# async def get_title(self, llm, max_words=5, **kwargs) -> str:
# """Generate text title"""
# if self.llm_type == LLMType.METAGPT.value:
# return Message(**self.history[0]).content if self.history else "New"
#
# summary = await self.summarize(llm=llm, max_words=500)
#
# language = CONFIG.language or DEFAULT_LANGUAGE
# command = f"Translate the above summary into a {language} title of less than {max_words} words."
# summaries = [summary, command]
# msg = "\n".join(summaries)
# logger.debug(f"title ask:{msg}")
# response = await llm.aask(msg=msg, system_msgs=[])
# logger.debug(f"title rsp: {response}")
# return response
#
# async def is_related(self, text1, text2, llm):
# if self.llm_type == LLMType.METAGPT.value:
# return await self._metagpt_is_related(text1=text1, text2=text2, llm=llm)
# return await self._openai_is_related(text1=text1, text2=text2, llm=llm)
#
# @staticmethod
# async def _metagpt_is_related(**kwargs):
# return False
#
# @staticmethod
# async def _openai_is_related(text1, text2, llm, **kwargs):
# # command = f"{text1}\n{text2}\n\nIf the two sentences above are related, return [TRUE] brief and clear. Otherwise, return [FALSE]."
# command = f"{text2}\n\nIs there any sentence above related to the following sentence: {text1}.\nIf is there any relevance, return [TRUE] brief and clear. Otherwise, return [FALSE] brief and clear."
# rsp = await llm.aask(msg=command, system_msgs=[])
# result = True if "TRUE" in rsp else False
# p2 = text2.replace("\n", "")
# p1 = text1.replace("\n", "")
# logger.info(f"IS_RELATED:\nParagraph 1: {p2}\nParagraph 2: {p1}\nRESULT: {result}\n")
# return result
#
# async def rewrite(self, sentence: str, context: str, llm):
# if self.llm_type == LLMType.METAGPT.value:
# return await self._metagpt_rewrite(sentence=sentence, context=context, llm=llm)
# return await self._openai_rewrite(sentence=sentence, context=context, llm=llm)
#
# async def _metagpt_rewrite(self, sentence: str, **kwargs):
# return sentence
#
# async def _openai_rewrite(self, sentence: str, context: str, llm, **kwargs):
# # command = (
# # f"{context}\n\nConsidering the content above, rewrite and return this sentence brief and clear:\n{sentence}"
# # )
# command = f"{context}\n\nExtract relevant information from every preceding sentence and use it to succinctly supplement or rewrite the following text in brief and clear:\n{sentence}"
# rsp = await llm.aask(msg=command, system_msgs=[])
# logger.info(f"REWRITE:\nCommand: {command}\nRESULT: {rsp}\n")
# return rsp
#
# @staticmethod
# def split_texts(text: str, window_size) -> List[str]:
# """Splitting long text into sliding windows text"""
# if window_size <= 0:
# window_size = BrainMemory.DEFAULT_TOKEN_SIZE
# total_len = len(text)
# if total_len <= window_size:
# return [text]
#
# padding_size = 20 if window_size > 20 else 0
# windows = []
# idx = 0
# data_len = window_size - padding_size
# while idx < total_len:
# if window_size + idx > total_len: # 不足一个滑窗
# windows.append(text[idx:])
# break
# # 每个窗口少算padding_size自然就可实现滑窗功能, 比如: [1, 2, 3, 4, 5, 6, 7, ....]
# # window_size=3, padding_size=1
# # [1, 2, 3], [3, 4, 5], [5, 6, 7], ....
# # idx=2, | idx=5 | idx=8 | ...
# w = text[idx : idx + window_size]
# windows.append(w)
# idx += data_len
#
# return windows
#
# @staticmethod
# def extract_info(input_string, pattern=r"\[([A-Z]+)\]:\s*(.+)"):
# match = re.match(pattern, input_string)
# if match:
# return match.group(1), match.group(2)
# else:
# return None, input_string
#
# def set_llm_type(self, v):
# if v and v != self.llm_type:
# self.llm_type = v
# self.is_dirty = True
#
# @property
# def is_history_available(self):
# return bool(self.history or self.historical_summary)
#
# @property
# def history_text(self):
# if self.llm_type == LLMType.METAGPT.value:
# return self._get_metagpt_history_text()
# return self._get_openai_history_text()
#
# def _get_metagpt_history_text(self):
# return BrainMemory.to_metagpt_history_format(self.history)
#
# def _get_openai_history_text(self):
# if len(self.history) == 0 and not self.historical_summary:
# return ""
# texts = [self.historical_summary] if self.historical_summary else []
# for m in self.history[:-1]:
# if isinstance(m, Dict):
# t = Message(**m).content
# elif isinstance(m, Message):
# t = m.content
# else:
# continue
# texts.append(t)
#
# return "\n".join(texts)
#
# DEFAULT_TOKEN_SIZE = 500
import json
import re
from typing import Dict, List
from pydantic import BaseModel, Field
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE
from metagpt.logs import logger
from metagpt.provider import MetaGPTAPI
from metagpt.schema import Message, SimpleMessage
from metagpt.utils.redis import Redis
class BrainMemory(BaseModel):
history: List[Message] = Field(default_factory=list)
knowledge: List[Message] = Field(default_factory=list)
historical_summary: str = ""
last_history_id: str = ""
is_dirty: bool = False
last_talk: str = None
cacheable: bool = True
def add_talk(self, msg: Message):
"""
Add message from user.
"""
msg.role = "user"
self.add_history(msg)
self.is_dirty = True
def add_answer(self, msg: Message):
"""Add message from LLM"""
msg.role = "assistant"
self.add_history(msg)
self.is_dirty = True
def get_knowledge(self) -> str:
texts = [m.content for m in self.knowledge]
return "\n".join(texts)
@staticmethod
async def loads(redis_key: str, redis_conf: Dict = None) -> "BrainMemory":
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
return BrainMemory()
v = await redis.get(key=redis_key)
logger.debug(f"REDIS GET {redis_key} {v}")
if v:
bm = BrainMemory.parse_raw(v)
bm.is_dirty = False
return bm
return BrainMemory()
async def dumps(self, redis_key: str, timeout_sec: int = 30 * 60, redis_conf: Dict = None):
if not self.is_dirty:
return
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
return False
v = self.json(ensure_ascii=False)
if self.cacheable:
await redis.set(key=redis_key, data=v, timeout_sec=timeout_sec)
logger.debug(f"REDIS SET {redis_key} {v}")
self.is_dirty = False
@staticmethod
def to_redis_key(prefix: str, user_id: str, chat_id: str):
return f"{prefix}:{user_id}:{chat_id}"
async def set_history_summary(self, history_summary, redis_key, redis_conf):
if self.historical_summary == history_summary:
if self.is_dirty:
await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
self.is_dirty = False
return
self.historical_summary = history_summary
self.history = []
await self.dumps(redis_key=redis_key, redis_conf=redis_conf)
self.is_dirty = False
def add_history(self, msg: Message):
if msg.id:
if self.to_int(msg.id, 0) <= self.to_int(self.last_history_id, -1):
return
self.history.append(msg.dict())
self.last_history_id = str(msg.id)
self.is_dirty = True
def exists(self, text) -> bool:
for m in reversed(self.history):
if m.get("content") == text:
return True
return False
@staticmethod
def to_int(v, default_value):
try:
return int(v)
except:
return default_value
def pop_last_talk(self):
v = self.last_talk
self.last_talk = None
return v
async def summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs):
if isinstance(llm, MetaGPTAPI):
return await self._metagpt_summarize(max_words=max_words)
return await self._openai_summarize(llm=llm, max_words=max_words, keep_language=keep_language, limit=limit)
async def _openai_summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1):
texts = [self.historical_summary]
for m in self.history:
texts.append(m.content)
text = "\n".join(texts)
text_length = len(text)
if limit > 0 and text_length < limit:
return text
summary = await llm.summarize(text=text, max_words=max_words, keep_language=keep_language, limit=limit)
if summary:
await self.set_history_summary(history_summary=summary, redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS)
return summary
raise ValueError(f"text too long:{text_length}")
async def _metagpt_summarize(self, max_words=200):
if not self.history:
return ""
total_length = 0
msgs = []
for m in reversed(self.history):
delta = len(m.content)
if total_length + delta > max_words:
left = max_words - total_length
if left == 0:
break
m.content = m.content[0:left]
msgs.append(m.dict())
break
msgs.append(m)
total_length += delta
msgs.reverse()
self.history = msgs
self.is_dirty = True
await self.dumps(redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS_CONF)
self.is_dirty = False
return BrainMemory.to_metagpt_history_format(self.history)
@staticmethod
def to_metagpt_history_format(history) -> str:
mmsg = [SimpleMessage(role=m.role, content=m.content) for m in history]
return json.dumps(mmsg)
async def get_title(self, llm, max_words=5, **kwargs) -> str:
"""Generate text title"""
if isinstance(llm, MetaGPTAPI):
return self.history[0].content if self.history else "New"
summary = await self.summarize(llm=llm, max_words=500)
language = CONFIG.language or DEFAULT_LANGUAGE
command = f"Translate the above summary into a {language} title of less than {max_words} words."
summaries = [summary, command]
msg = "\n".join(summaries)
logger.debug(f"title ask:{msg}")
response = await llm.aask(msg=msg, system_msgs=[])
logger.debug(f"title rsp: {response}")
return response
async def is_related(self, text1, text2, llm):
if isinstance(llm, MetaGPTAPI):
return await self._metagpt_is_related(text1=text1, text2=text2, llm=llm)
return await self._openai_is_related(text1=text1, text2=text2, llm=llm)
@staticmethod
async def _metagpt_is_related(**kwargs):
return False
@staticmethod
async def _openai_is_related(text1, text2, llm, **kwargs):
command = (
f"{text2}\n\nIs there any sentence above related to the following sentence: {text1}.\nIf is there "
"any relevance, return [TRUE] brief and clear. Otherwise, return [FALSE] brief and clear."
)
rsp = await llm.aask(msg=command, system_msgs=[])
result = True if "TRUE" in rsp else False
p2 = text2.replace("\n", "")
p1 = text1.replace("\n", "")
logger.info(f"IS_RELATED:\nParagraph 1: {p2}\nParagraph 2: {p1}\nRESULT: {result}\n")
return result
async def rewrite(self, sentence: str, context: str, llm):
if isinstance(llm, MetaGPTAPI):
return await self._metagpt_rewrite(sentence=sentence, context=context, llm=llm)
return await self._openai_rewrite(sentence=sentence, context=context, llm=llm)
@staticmethod
async def _metagpt_rewrite(sentence: str):
return sentence
@staticmethod
async def _openai_rewrite(sentence: str, context: str, llm):
command = (
f"{context}\n\nExtract relevant information from every preceding sentence and use it to succinctly "
f"supplement or rewrite the following text in brief and clear:\n{sentence}"
)
rsp = await llm.aask(msg=command, system_msgs=[])
logger.info(f"REWRITE:\nCommand: {command}\nRESULT: {rsp}\n")
return rsp
@staticmethod
def extract_info(input_string, pattern=r"\[([A-Z]+)\]:\s*(.+)"):
match = re.match(pattern, input_string)
if match:
return match.group(1), match.group(2)
else:
return None, input_string
@property
def is_history_available(self):
return bool(self.history or self.historical_summary)
@property
def history_text(self):
if len(self.history) == 0 and not self.historical_summary:
return ""
texts = [self.historical_summary] if self.historical_summary else []
for m in self.history[:-1]:
if isinstance(m, Dict):
t = Message(**m).content
elif isinstance(m, Message):
t = m.content
else:
continue
texts.append(t)
return "\n".join(texts)

View file

@ -356,7 +356,7 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
await self.async_client.close()
self.async_client = None
async def summarize(self, text: str, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs) -> str:
async def summarize(self, text: str, max_words=200, keep_language: bool = False, limit: int = -1) -> str:
max_token_count = DEFAULT_MAX_TOKENS
max_count = 100
text_length = len(text)

View file

@ -14,46 +14,51 @@
indicates that further reasoning cannot continue.
"""
import asyncio
from enum import Enum
from pathlib import Path
from typing import Optional
from pydantic import Field
from metagpt.actions import ActionOutput
from metagpt.actions.skill_action import ArgumentsParingAction, SkillAction
from metagpt.actions.talk_action import TalkAction
from metagpt.config import CONFIG
from metagpt.learn.skill_loader import SkillLoader
from metagpt.learn.skill_loader import SkillsDeclaration
from metagpt.logs import logger
from metagpt.memory.brain_memory import BrainMemory, MessageType
from metagpt.memory.brain_memory import BrainMemory
from metagpt.roles import Role
from metagpt.schema import Message
class MessageType(Enum):
Talk = "TALK"
Skill = "SKILL"
class Assistant(Role):
"""Assistant for solving common issues."""
def __init__(
self,
name="Lily",
profile="An assistant",
goal="Help to solve problem",
constraints="Talk in {language}",
desc="",
*args,
**kwargs,
):
super(Assistant, self).__init__(
name=name, profile=profile, goal=goal, constraints=constraints, desc=desc, *args, **kwargs
)
brain_memory = CONFIG.BRAIN_MEMORY
self.memory = BrainMemory(**brain_memory) if brain_memory else BrainMemory(llm_type=CONFIG.LLM_TYPE)
skill_path = Path(CONFIG.SKILL_PATH) if CONFIG.SKILL_PATH else None
self.skills = SkillLoader(skill_yaml_file_name=skill_path)
name: str = "Lily"
profile: str = "An assistant"
goal: str = "Help to solve problem"
constraints: str = "Talk in {language}"
desc: str = ""
memory: BrainMemory = Field(default_factory=BrainMemory)
skills: Optional[SkillsDeclaration] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.constraints = self.constraints.format(language=kwargs.get("language") or CONFIG.language or "Chinese")
async def think(self) -> bool:
"""Everything will be done part by part."""
last_talk = await self.refine_memory()
if not last_talk:
return False
if not self.skills:
skill_path = Path(CONFIG.SKILL_PATH) if CONFIG.SKILL_PATH else None
self.skills = await SkillsDeclaration.load(skill_yaml_file_name=skill_path)
prompt = ""
skills = self.skills.get_skill_list()
for desc, name in skills.items():
@ -64,20 +69,20 @@ class Assistant(Role):
logger.info(f"THINK: {prompt}\n, THINK RESULT: {rsp}\n")
return await self._plan(rsp, last_talk=last_talk)
async def act(self) -> ActionOutput:
result = await self._rc.todo.run(**CONFIG.options)
async def act(self) -> Message:
result = await self._rc.todo.run()
if not result:
return None
if isinstance(result, str):
msg = Message(content=result)
output = ActionOutput(content=result)
msg = Message(content=result, role="assistant", cause_by=self._rc.todo)
elif isinstance(result, Message):
msg = result
else:
msg = Message(
content=result.content, instruct_content=result.instruct_content, cause_by=type(self._rc.todo)
)
output = result
self.memory.add_answer(msg)
return output
return msg
async def talk(self, text):
self.memory.add_talk(Message(content=text))
@ -94,10 +99,9 @@ class Assistant(Role):
async def talk_handler(self, text, **kwargs) -> bool:
history = self.memory.history_text
text = kwargs.get("last_talk") or text
action = TalkAction(
talk=text, knowledge=self.memory.get_knowledge(), history_summary=history, llm=self._llm, **kwargs
self._rc.todo = TalkAction(
context=text, knowledge=self.memory.get_knowledge(), history_summary=history, llm=self._llm, **kwargs
)
self.add_to_do(action)
return True
async def skill_handler(self, text, **kwargs) -> bool:
@ -106,12 +110,13 @@ class Assistant(Role):
if not skill:
logger.info(f"skill not found: {text}")
return await self.talk_handler(text=last_talk, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self._llm, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self._llm, ask=last_talk, **kwargs)
await action.run(**kwargs)
if action.args is None:
return await self.talk_handler(text=last_talk, **kwargs)
action = SkillAction(skill=skill, args=action.args, llm=self._llm, name=skill.name, desc=skill.description)
self.add_to_do(action)
self._rc.todo = SkillAction(
skill=skill, args=action.args, llm=self._llm, name=skill.name, desc=skill.description
)
return True
async def refine_memory(self) -> str:
@ -123,8 +128,8 @@ class Assistant(Role):
history_summary = await self.memory.summarize(max_words=800, keep_language=True, llm=self._llm)
if last_talk and await self.memory.is_related(text1=last_talk, text2=history_summary, llm=self._llm):
# Merge relevant content.
last_talk = await self.memory.rewrite(sentence=last_talk, context=history_summary, llm=self._llm)
return last_talk
merged = await self.memory.rewrite(sentence=last_talk, context=history_summary, llm=self._llm)
return f"{merged} {last_talk}"
return last_talk
@ -136,24 +141,3 @@ class Assistant(Role):
self.memory = BrainMemory(**jsn)
except Exception as e:
logger.exception(f"load error:{e}, data:{jsn}")
async def main():
topic = "what's apple"
role = Assistant(language="Chinese")
await role.talk(topic)
while True:
has_action = await role.think()
if not has_action:
break
msg = await role.act()
logger.info(msg)
# Retrieve user terminal input.
logger.info("Enter prompt")
talk = input("You: ")
await role.talk(talk)
if __name__ == "__main__":
CONFIG.language = "Chinese"
asyncio.run(main())

View file

@ -23,7 +23,7 @@ from abc import ABC
from asyncio import Queue, QueueEmpty, wait_for
from json import JSONDecodeError
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Type, TypedDict, TypeVar
from typing import Any, Dict, List, Optional, Set, Type, TypeVar
from pydantic import BaseModel, Field
@ -46,7 +46,7 @@ from metagpt.utils.serialize import (
)
class RawMessage(TypedDict):
class SimpleMessage(BaseModel):
content: str
role: str

View file

@ -11,23 +11,23 @@ import base64
import aiohttp
import requests
from openai import AsyncOpenAI
from metagpt.config import CONFIG, Config
from metagpt.config import Config
from metagpt.llm import LLM
from metagpt.logs import logger
class OpenAIText2Image:
def __init__(self, openai_api_key):
def __init__(self):
"""
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
"""
self.openai_api_key = openai_api_key if openai_api_key else CONFIG.OPENAI_API_KEY
self._client = AsyncOpenAI(api_key=self.openai_api_key, base_url=CONFIG.openai_api_base)
self._llm = LLM()
self._client = self._llm.async_client
def __del__(self):
if self._client:
self._client.close()
if self._llm:
self._llm.close()
async def text_2_image(self, text, size_type="1024x1024"):
"""Text to image
@ -66,19 +66,16 @@ class OpenAIText2Image:
# Export
async def oas3_openai_text_to_image(text, size_type: str = "1024x1024", openai_api_key=""):
async def oas3_openai_text_to_image(text, size_type: str = "1024x1024"):
"""Text to image
:param text: The text used for image conversion.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param size_type: One of ['256x256', '512x512', '1024x1024']
:return: The image data is returned in Base64 encoding.
"""
if not text:
return ""
if not openai_api_key:
openai_api_key = CONFIG.OPENAI_API_KEY
return await OpenAIText2Image(openai_api_key).text_2_image(text, size_type=size_type)
return await OpenAIText2Image().text_2_image(text, size_type=size_type)
if __name__ == "__main__":

View file

@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/19
@Author : mashenquan
@File : test_skill_action.py
@Desc : Unit tests.
"""
import pytest
from metagpt.actions.skill_action import ArgumentsParingAction, SkillAction
from metagpt.learn.skill_loader import Example, Parameter, Returns, Skill
class TestSkillAction:
skill = Skill(
name="text_to_image",
description="Create a drawing based on the text.",
id="text_to_image.text_to_image",
x_prerequisite={
"configurations": {
"OPENAI_API_KEY": {
"type": "string",
"description": "OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`",
},
"METAGPT_TEXT_TO_IMAGE_MODEL_URL": {"type": "string", "description": "Model url."},
},
"required": {"oneOf": ["OPENAI_API_KEY", "METAGPT_TEXT_TO_IMAGE_MODEL_URL"]},
},
parameters={
"text": Parameter(type="string", description="The text used for image conversion."),
"size_type": Parameter(type="string", description="size type"),
},
examples=[
Example(ask="Draw a girl", answer='text_to_image(text="Draw a girl", size_type="512x512")'),
Example(ask="Draw an apple", answer='text_to_image(text="Draw an apple", size_type="512x512")'),
],
returns=Returns(type="string", format="base64"),
)
@pytest.mark.asyncio
async def test_parser(self):
args = ArgumentsParingAction.parse_arguments(
skill_name="text_to_image", txt='`text_to_image(text="Draw an apple", size_type="512x512")`'
)
assert args.get("text") == "Draw an apple"
assert args.get("size_type") == "512x512"
@pytest.mark.asyncio
async def test_parser_action(self):
parser_action = ArgumentsParingAction(skill=self.skill, ask="Draw an apple")
rsp = await parser_action.run()
assert rsp
assert parser_action.args
assert parser_action.args.get("text") == "Draw an apple"
assert parser_action.args.get("size_type") == "512x512"
action = SkillAction(skill=self.skill, args=parser_action.args)
rsp = await action.run()
assert rsp
assert "image/png;base64," in rsp.content
if __name__ == "__main__":
pytest.main([__file__, "-s"])

View file

@ -6,12 +6,14 @@
@File : test_skill_loader.py
@Desc : Unit tests.
"""
import pytest
from metagpt.config import CONFIG
from metagpt.learn.skill_loader import SkillLoader
from metagpt.learn.skill_loader import SkillsDeclaration
def test_suite():
@pytest.mark.asyncio
async def test_suite():
CONFIG.agent_skills = [
{"id": 1, "name": "text_to_speech", "type": "builtin", "config": {}, "enabled": True},
{"id": 2, "name": "text_to_image", "type": "builtin", "config": {}, "enabled": True},
@ -21,7 +23,7 @@ def test_suite():
{"id": 6, "name": "knowledge", "type": "builtin", "config": {}, "enabled": True},
{"id": 6, "name": "web_search", "type": "builtin", "config": {}, "enabled": True},
]
loader = SkillLoader()
loader = await SkillsDeclaration.load()
skills = loader.get_skill_list()
assert skills
assert len(skills) >= 3
@ -29,7 +31,7 @@ def test_suite():
assert desc
assert name
entity = loader.get_entity("Assistant")
entity = loader.entities.get("Assistant")
assert entity
assert entity.skills
for sk in entity.skills:
@ -38,4 +40,4 @@ def test_suite():
if __name__ == "__main__":
test_suite()
pytest.main([__file__, "-s"])

View file

@ -0,0 +1,100 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/12/25
@Author : mashenquan
@File : test_asssistant.py
@Desc : Used by AgentStore.
"""
import pytest
from pydantic import BaseModel
from metagpt.actions.skill_action import SkillAction
from metagpt.actions.talk_action import TalkAction
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.memory.brain_memory import BrainMemory
from metagpt.roles.assistant import Assistant
from metagpt.schema import Message
from metagpt.utils.common import any_to_str
@pytest.mark.asyncio
async def test_run():
CONFIG.language = "Chinese"
class Input(BaseModel):
memory: BrainMemory
language: str
agent_description: str
cause_by: str
inputs = [
{
"memory": {
"history": [
{
"content": "who is tulin",
"role": "user",
"id": 1,
},
{"content": "The one who eaten a poison apple.", "role": "assistant"},
],
"knowledge": [{"content": "tulin is a scientist."}],
"last_talk": "what's apple?",
},
"language": "English",
"agent_description": "chatterbox",
"cause_by": any_to_str(TalkAction),
},
{
"memory": {
"history": [
{
"content": "can you draw me an picture?",
"role": "user",
"id": 1,
},
{"content": "Yes, of course. What do you want me to draw", "role": "assistant"},
],
"knowledge": [{"content": "tulin is a scientist."}],
"last_talk": "Draw me an apple.",
},
"language": "English",
"agent_description": "painter",
"cause_by": any_to_str(SkillAction),
},
]
CONFIG.agent_skills = [
{"id": 1, "name": "text_to_speech", "type": "builtin", "config": {}, "enabled": True},
{"id": 2, "name": "text_to_image", "type": "builtin", "config": {}, "enabled": True},
{"id": 3, "name": "ai_call", "type": "builtin", "config": {}, "enabled": True},
{"id": 3, "name": "data_analysis", "type": "builtin", "config": {}, "enabled": True},
{"id": 5, "name": "crawler", "type": "builtin", "config": {"engine": "ddg"}, "enabled": True},
{"id": 6, "name": "knowledge", "type": "builtin", "config": {}, "enabled": True},
{"id": 6, "name": "web_search", "type": "builtin", "config": {}, "enabled": True},
]
for i in inputs:
seed = Input(**i)
CONFIG.language = seed.language
CONFIG.agent_description = seed.agent_description
role = Assistant(language="Chinese")
role.memory = seed.memory # Restore historical conversation content.
while True:
has_action = await role.think()
if not has_action:
break
msg: Message = await role.act()
logger.info(msg)
assert msg
assert msg.cause_by == seed.cause_by
assert msg.content
# # Retrieve user terminal input.
# logger.info("Enter prompt")
# talk = input("You: ")
# await role.talk(talk)
if __name__ == "__main__":
pytest.main([__file__, "-s"])