Merge branch 'main' into feature/talk_prompt

This commit is contained in:
莘权 马 2023-09-15 17:44:58 +08:00
commit dda1745c79
12 changed files with 355 additions and 260 deletions

View file

@ -5,6 +5,7 @@
@Author : alexanderwu
@File : action.py
@Modified By: mashenquan, 2023/8/20. Add function return annotations.
@Modified By: mashenquan, 2023/9/8. Replace LLM with LLMFactory
"""
from __future__ import annotations
@ -14,16 +15,18 @@ from typing import Optional
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions.action_output import ActionOutput
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.utils.common import OutputParser
class Action(ABC):
def __init__(self, name: str = "", context=None, llm: LLM = None):
def __init__(self, name: str = "", context=None, llm: BaseGPTAPI = None):
self.name: str = name
if llm is None:
llm = LLM()
from metagpt.llm import LLMFactory
llm = LLMFactory.new_llm()
self.llm = llm
self.context = context
self.prefix = ""

View file

@ -6,10 +6,12 @@
@File : talk_action.py
@Desc : Act as its a talk
"""
import json
from metagpt.actions import Action, ActionOutput
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE
from metagpt.llm import LLMType
from metagpt.logs import logger
@ -62,13 +64,40 @@ class TalkAction(Action):
logger.info(f"PROMPT: {prompt}")
return prompt
async def run(self, *args, **kwargs) -> ActionOutput:
async def run_old(self, *args, **kwargs) -> ActionOutput:
prompt = self.prompt
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
logger.debug(f"PROMPT:{prompt}\nRESULT:{rsp}\n")
self._rsp = ActionOutput(content=rsp)
return self._rsp
@property
def aask_args(self):
language = CONFIG.language or DEFAULT_LANGUAGE
system_msgs = [
f"You are {CONFIG.agent_description}.",
"Your responses should align with the role-play agreement, "
"maintaining the character's persona and habits. When faced with unrelated questions, playfully "
"decline to answer without revealing your AI nature to preserve the character's image.",
"If the information is insufficient, you can search in the context or knowledge.",
f"Answer the following questions strictly in {language}, and the answers must follow the Markdown format.",
]
format_msgs = []
if self._knowledge:
format_msgs.append({"role": "assistant", "content": self._knowledge})
if self._history_summary:
if CONFIG.LLM_TYPE == LLMType.METAGPT.value:
format_msgs.extend(json.loads(self._history_summary))
else:
format_msgs.append({"role": "assistant", "content": self._history_summary})
return self._talk, format_msgs, system_msgs
async def run(self, *args, **kwargs) -> ActionOutput:
msg, format_msgs, system_msgs = self.aask_args
rsp = await self.llm.aask(msg=msg, format_msgs=format_msgs, system_msgs=system_msgs)
self._rsp = ActionOutput(content=rsp)
return self._rsp
__FORMATION__ = """Formation: "Capacity and role" defines the role you are currently playing;
"[HISTORY_BEGIN]" and "[HISTORY_END]" tags enclose the historical conversation;
"[KNOWLEDGE_BEGIN]" and "[KNOWLEDGE_END]" tags enclose the knowledge may help for your responses;

View file

@ -48,12 +48,6 @@ BRAIN_MEMORY = "BRAIN_MEMORY"
SKILL_PATH = "SKILL_PATH"
SERPER_API_KEY = "SERPER_API_KEY"
# Key Definitions for MetaGPT LLM
METAGPT_API_MODEL = "METAGPT_API_MODEL"
METAGPT_API_KEY = "METAGPT_API_KEY"
METAGPT_API_BASE = "METAGPT_API_BASE"
METAGPT_API_TYPE = "METAGPT_API_TYPE"
METAGPT_API_VERSION = "METAGPT_API_VERSION"
# format
BASE64_FORMAT = "base64"

View file

@ -4,17 +4,41 @@
@Time : 2023/5/11 14:45
@Author : alexanderwu
@File : llm.py
@Modified By: mashenquan, 2023
"""
from enum import Enum
from metagpt.provider.anthropic_api import Claude2 as Claude
from metagpt.provider.openai_api import OpenAIGPTAPI as LLM
import openai
DEFAULT_LLM = LLM()
CLAUDE_LLM = Claude()
from metagpt.config import CONFIG
async def ai_func(prompt):
"""使用LLM进行QA
QA with LLMs
"""
return await DEFAULT_LLM.aask(prompt)
class LLMType(Enum):
OPENAI = "OpenAI"
METAGPT = "MetaGPT"
CLAUDE = "Claude"
UNKNOWN = "UNKNOWN"
@classmethod
def get(cls, value):
for member in cls:
if member.value == value:
return member
return cls.UNKNOWN
class LLMFactory:
@staticmethod
def new_llm() -> object:
from metagpt.provider.anthropic_api import Claude2 as Claude
from metagpt.provider.metagpt_llm_api import MetaGPTLLMAPI as MetaGPT_LLM
from metagpt.provider.openai_api import OpenAIGPTAPI as OpenAI_LLM
if CONFIG.LLM_TYPE == LLMType.OPENAI.value:
return OpenAI_LLM()
if CONFIG.LLM_TYPE == LLMType.METAGPT.value:
return MetaGPT_LLM()
if CONFIG.LLM_TYPE == LLMType.CLAUDE.value:
return Claude()
raise openai.InvalidRequestError(message=f"Unsupported LLM TYPE: {CONFIG.LLM_TYPE}", param=None)

View file

@ -8,13 +8,19 @@
@Modified By: mashenquan, 2023/9/4. + redis memory cache.
"""
import json
import re
from enum import Enum
from typing import Dict, List
from typing import Dict, List, Optional
import openai
import pydantic
from metagpt import Message
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE, DEFAULT_MAX_TOKENS
from metagpt.llm import LLMType
from metagpt.logs import logger
from metagpt.schema import RawMessage
from metagpt.utils.redis import Redis
@ -35,42 +41,28 @@ class BrainMemory(pydantic.BaseModel):
last_history_id: str = ""
is_dirty: bool = False
last_talk: str = None
llm_type: Optional[str] = None
cacheable: bool = True
def add_talk(self, msg: Message):
msg.add_tag(MessageType.Talk.value)
self.history.append(msg.dict())
self.add_history(msg)
self.is_dirty = True
def add_answer(self, msg: Message):
msg.add_tag(MessageType.Answer.value)
self.history.append(msg.dict())
self.add_history(msg)
self.is_dirty = True
def get_knowledge(self) -> str:
texts = [Message(**m).content for m in self.knowledge]
return "\n".join(texts)
@property
def history_text(self):
if len(self.history) == 0 and not self.historical_summary:
return ""
texts = [self.historical_summary] if self.historical_summary else []
for m in self.history[:-1]:
if isinstance(m, Dict):
t = Message(**m).content
elif isinstance(m, Message):
t = m.content
else:
continue
texts.append(t)
return "\n".join(texts)
@staticmethod
async def loads(redis_key: str, redis_conf: Dict = None) -> "BrainMemory":
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
return BrainMemory()
return BrainMemory(llm_type=CONFIG.LLM_TYPE)
v = await redis.get(key=redis_key)
logger.debug(f"REDIS GET {redis_key} {v}")
if v:
@ -78,20 +70,23 @@ class BrainMemory(pydantic.BaseModel):
bm = BrainMemory(**data)
bm.is_dirty = False
return bm
return BrainMemory()
return BrainMemory(llm_type=CONFIG.LLM_TYPE)
async def dumps(self, redis_key: str, timeout_sec: int = 30 * 60, redis_conf: Dict = None):
if not self.is_dirty:
return
redis = Redis(conf=redis_conf)
if not redis.is_valid() or not redis_key:
return False
v = self.json()
await redis.set(key=redis_key, data=v, timeout_sec=timeout_sec)
logger.debug(f"REDIS SET {redis_key} {v}")
if self.cacheable:
await redis.set(key=redis_key, data=v, timeout_sec=timeout_sec)
logger.debug(f"REDIS SET {redis_key} {v}")
self.is_dirty = False
@staticmethod
def to_redis_key(prefix: str, user_id: str, chat_id: str):
return f"{prefix}:{chat_id}:{user_id}"
return f"{prefix}:{user_id}:{chat_id}"
async def set_history_summary(self, history_summary, redis_key, redis_conf):
if self.historical_summary == history_summary:
@ -107,9 +102,10 @@ class BrainMemory(pydantic.BaseModel):
def add_history(self, msg: Message):
if msg.id:
if self.to_int(msg.id, 0) < self.to_int(self.last_history_id, -1):
if self.to_int(msg.id, 0) <= self.to_int(self.last_history_id, -1):
return
self.history.append(msg.dict())
self.last_history_id = str(msg.id)
self.is_dirty = True
def exists(self, text) -> bool:
@ -129,3 +125,224 @@ class BrainMemory(pydantic.BaseModel):
v = self.last_talk
self.last_talk = None
return v
async def summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs):
if self.llm_type == LLMType.METAGPT.value:
return await self._metagpt_summarize(llm=llm, max_words=max_words, keep_language=keep_language, **kwargs)
return await self._openai_summarize(
llm=llm, max_words=max_words, keep_language=keep_language, limit=limit, **kwargs
)
async def _openai_summarize(self, llm, max_words=200, keep_language: bool = False, limit: int = -1, **kwargs):
max_token_count = DEFAULT_MAX_TOKENS
max_count = 100
texts = [self.historical_summary]
for i in self.history:
m = Message(**i)
texts.append(m.content)
text = "\n".join(texts)
text_length = len(text)
if limit > 0 and text_length < limit:
return text
summary = ""
while max_count > 0:
if text_length < max_token_count:
summary = await self._get_summary(text=text, llm=llm, max_words=max_words, keep_language=keep_language)
break
padding_size = 20 if max_token_count > 20 else 0
text_windows = self.split_texts(text, window_size=max_token_count - padding_size)
part_max_words = min(int(max_words / len(text_windows)) + 1, 100)
summaries = []
for ws in text_windows:
response = await self._get_summary(
text=ws, llm=llm, max_words=part_max_words, keep_language=keep_language
)
summaries.append(response)
if len(summaries) == 1:
summary = summaries[0]
break
# Merged and retry
text = "\n".join(summaries)
text_length = len(text)
max_count -= 1 # safeguard
if summary:
await self.set_history_summary(history_summary=summary, redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS)
return summary
raise openai.error.InvalidRequestError(message="text too long", param=None)
async def _metagpt_summarize(self, max_words=200, **kwargs):
if not self.history:
return ""
total_length = 0
msgs = []
for i in reversed(self.history):
m = Message(**i)
delta = len(m.content)
if total_length + delta > max_words:
left = max_words - total_length
if left == 0:
break
m.content = m.content[0:left]
msgs.append(m.dict())
break
msgs.append(i)
total_length += delta
msgs.reverse()
self.history = msgs
self.is_dirty = True
await self.dumps(redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS_CONF)
self.is_dirty = False
return BrainMemory.to_metagpt_history_format(self.history)
@staticmethod
def to_metagpt_history_format(history) -> str:
mmsg = []
for m in history:
msg = Message(**m)
r = RawMessage(role="user" if MessageType.Talk.value in msg.tags else "assistant", content=msg.content)
mmsg.append(r)
return json.dumps(mmsg)
@staticmethod
async def _get_summary(text: str, llm, max_words=20, keep_language: bool = False):
"""Generate text summary"""
if len(text) < max_words:
return text
if keep_language:
command = f".Translate the above content into a summary of less than {max_words} words in language of the content strictly."
else:
command = f"Translate the above content into a summary of less than {max_words} words."
msg = text + "\n\n" + command
logger.debug(f"summary ask:{msg}")
response = await llm.aask(msg=msg, system_msgs=[])
logger.debug(f"summary rsp: {response}")
return response
async def get_title(self, llm, max_words=5, **kwargs) -> str:
"""Generate text title"""
if self.llm_type == LLMType.METAGPT.value:
return Message(**self.history[0]).content if self.history else "New"
summary = await self.summarize(llm=llm, max_words=500)
language = CONFIG.language or DEFAULT_LANGUAGE
command = f"Translate the above summary into a {language} title of less than {max_words} words."
summaries = [summary, command]
msg = "\n".join(summaries)
logger.debug(f"title ask:{msg}")
response = await llm.aask(msg=msg, system_msgs=[])
logger.debug(f"title rsp: {response}")
return response
async def is_related(self, text1, text2, llm):
if self.llm_type == LLMType.METAGPT.value:
return await self._metagpt_is_related(text1=text1, text2=text2, llm=llm)
return await self._openai_is_related(text1=text1, text2=text2, llm=llm)
@staticmethod
async def _metagpt_is_related(**kwargs):
return False
@staticmethod
async def _openai_is_related(text1, text2, llm, **kwargs):
# command = f"{text1}\n{text2}\n\nIf the two sentences above are related, return [TRUE] brief and clear. Otherwise, return [FALSE]."
command = f"{text2}\n\nIs there any sentence above related to the following sentence: {text1}.\nIf is there any relevance, return [TRUE] brief and clear. Otherwise, return [FALSE] brief and clear."
rsp = await llm.aask(msg=command, system_msgs=[])
result = True if "TRUE" in rsp else False
p2 = text2.replace("\n", "")
p1 = text1.replace("\n", "")
logger.info(f"IS_RELATED:\nParagraph 1: {p2}\nParagraph 2: {p1}\nRESULT: {result}\n")
return result
async def rewrite(self, sentence: str, context: str, llm):
if self.llm_type == LLMType.METAGPT.value:
return await self._metagpt_rewrite(sentence=sentence, context=context, llm=llm)
return await self._openai_rewrite(sentence=sentence, context=context, llm=llm)
async def _metagpt_rewrite(self, sentence: str, **kwargs):
return sentence
async def _openai_rewrite(self, sentence: str, context: str, llm, **kwargs):
# command = (
# f"{context}\n\nConsidering the content above, rewrite and return this sentence brief and clear:\n{sentence}"
# )
command = f"{context}\n\nExtract relevant information from every preceding sentence and use it to succinctly supplement or rewrite the following text in brief and clear:\n{sentence}"
rsp = await llm.aask(msg=command, system_msgs=[])
logger.info(f"REWRITE:\nCommand: {command}\nRESULT: {rsp}\n")
return rsp
@staticmethod
def split_texts(text: str, window_size) -> List[str]:
"""Splitting long text into sliding windows text"""
if window_size <= 0:
window_size = BrainMemory.DEFAULT_TOKEN_SIZE
total_len = len(text)
if total_len <= window_size:
return [text]
padding_size = 20 if window_size > 20 else 0
windows = []
idx = 0
data_len = window_size - padding_size
while idx < total_len:
if window_size + idx > total_len: # 不足一个滑窗
windows.append(text[idx:])
break
# 每个窗口少算padding_size自然就可实现滑窗功能, 比如: [1, 2, 3, 4, 5, 6, 7, ....]
# window_size=3, padding_size=1
# [1, 2, 3], [3, 4, 5], [5, 6, 7], ....
# idx=2, | idx=5 | idx=8 | ...
w = text[idx : idx + window_size]
windows.append(w)
idx += data_len
return windows
@staticmethod
def extract_info(input_string, pattern=r"\[([A-Z]+)\]:\s*(.+)"):
match = re.match(pattern, input_string)
if match:
return match.group(1), match.group(2)
else:
return None, input_string
def set_llm_type(self, v):
if v and v != self.llm_type:
self.llm_type = v
self.is_dirty = True
@property
def is_history_available(self):
return bool(self.history or self.historical_summary)
@property
def history_text(self):
if self.llm_type == LLMType.METAGPT.value:
return self._get_metagpt_history_text()
return self._get_openai_history_text()
def _get_metagpt_history_text(self):
return BrainMemory.to_metagpt_history_format(self.history)
def _get_openai_history_text(self):
if len(self.history) == 0 and not self.historical_summary:
return ""
texts = [self.historical_summary] if self.historical_summary else []
for m in self.history[:-1]:
if isinstance(m, Dict):
t = Message(**m).content
elif isinstance(m, Message):
t = m.content
else:
continue
texts.append(t)
return "\n".join(texts)
DEFAULT_TOKEN_SIZE = 500

View file

@ -4,9 +4,11 @@
@Time : 2023/5/5 22:59
@Author : alexanderwu
@File : __init__.py
@Modified By: mashenquan, 2023/9/8. Add `MetaGPTLLMAPI`
"""
from metagpt.provider.openai_api import OpenAIGPTAPI
from metagpt.provider.metagpt_llm_api import MetaGPTLLMAPI
__all__ = ["OpenAIGPTAPI"]
__all__ = ["OpenAIGPTAPI", "MetaGPTLLMAPI"]

View file

@ -9,7 +9,6 @@
from abc import abstractmethod
from typing import Optional
from metagpt.logs import logger
from metagpt.provider.base_chatbot import BaseChatbot
@ -38,18 +37,21 @@ class BaseGPTAPI(BaseChatbot):
rsp = self.completion(message)
return self.get_choice_text(rsp)
async def aask(self, msg: str, system_msgs: Optional[list[str]] = None, generator: bool = False) -> str:
async def aask(
self,
msg: str,
system_msgs: Optional[list[str]] = None,
format_msgs: Optional[list[dict[str, str]]] = None,
generator: bool = False,
) -> str:
if system_msgs:
message = self._system_msgs(system_msgs) + [self._user_msg(msg)]
message = self._system_msgs(system_msgs)
else:
message = [self._default_system_msg(), self._user_msg(msg)]
try:
rsp = await self.acompletion_text(message, stream=True, generator=generator)
except Exception as e:
logger.exception(f"{e}")
logger.info(f"ask:{msg}, error:{e}")
raise e
logger.info(f"ask:{msg}, anwser:{rsp}")
message = [self._default_system_msg()]
if format_msgs:
message.extend(format_msgs)
message.append(self._user_msg(msg))
rsp = await self.acompletion_text(message, stream=True, generator=generator)
return rsp
def _extract_assistant_rsp(self, context):

View file

@ -6,28 +6,11 @@
@Desc : MetaGPT LLM related APIs
"""
import openai
from metagpt.config import CONFIG
from metagpt.provider import OpenAIGPTAPI
from metagpt.provider.openai_api import RateLimiter
class MetaGPTLLMAPI(OpenAIGPTAPI):
"""MetaGPT LLM api"""
def __init__(self):
self.__init_openai()
self.llm = openai
self.model = CONFIG.METAGPT_API_MODEL
self.auto_max_tokens = False
RateLimiter.__init__(self, rpm=self.rpm)
def __init_openai(self, *args, **kwargs):
openai.api_key = CONFIG.METAGPT_API_KEY
if CONFIG.METAGPT_API_BASE:
openai.api_base = CONFIG.METAGPT_API_BASE
if CONFIG.METAGPT_API_TYPE:
openai.api_type = CONFIG.METAGPT_API_TYPE
openai.api_version = CONFIG.METAGPT_API_VERSION
self.rpm = int(CONFIG.RPM) if CONFIG.RPM else 10
super().__init__()

View file

@ -7,24 +7,21 @@
Change cost control from global to company level.
"""
import asyncio
import random
import re
import time
import traceback
from typing import List
import openai
from openai.error import APIConnectionError
from openai.error import APIConnectionError, RateLimitError
from tenacity import (
after_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
wait_fixed,
)
from metagpt.config import CONFIG
from metagpt.const import DEFAULT_LANGUAGE, DEFAULT_MAX_TOKENS
from metagpt.llm import LLMType
from metagpt.logs import logger
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.utils.cost_manager import Costs
@ -77,16 +74,13 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
"""
def __init__(self):
self.llm = openai
self.model = CONFIG.openai_api_model
self.auto_max_tokens = False
self.rpm = int(CONFIG.get("RPM", 10))
RateLimiter.__init__(self, rpm=self.rpm)
async def _achat_completion_stream(self, messages: list[dict]) -> str:
response = await self.async_retry_call(
openai.ChatCompletion.acreate, **self._cons_kwargs(messages), stream=True
)
response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True)
# iterate through the stream of events
async for chunk in response:
chunk_message = chunk["choices"][0]["delta"] # extract the message
@ -120,12 +114,12 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return kwargs
async def _achat_completion(self, messages: list[dict]) -> dict:
rsp = await self.async_retry_call(self.llm.ChatCompletion.acreate, **self._cons_kwargs(messages))
rsp = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages))
self._update_costs(rsp.get("usage"))
return rsp
def _chat_completion(self, messages: list[dict]) -> dict:
rsp = self.retry_call(self.llm.ChatCompletion.create, **self._cons_kwargs(messages))
rsp = openai.ChatCompletion.create(**self._cons_kwargs(messages))
self._update_costs(rsp)
return rsp
@ -146,6 +140,13 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
retry=retry_if_exception_type(APIConnectionError),
retry_error_callback=log_and_reraise,
)
@retry(
stop=stop_after_attempt(6),
wait=wait_exponential(1),
after=after_log(logger, logger.level("WARNING").name),
retry=retry_if_exception_type(RateLimitError),
reraise=True,
)
async def acompletion_text(self, messages: list[dict], stream=False, generator: bool = False) -> str:
"""when streaming, print each token in place."""
if stream:
@ -223,160 +224,8 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter):
return CONFIG.max_tokens_rsp
return get_max_completion_tokens(messages, self.model, CONFIG.max_tokens_rsp)
async def get_summary(self, text: str, max_words=200, keep_language: bool = False):
max_token_count = DEFAULT_MAX_TOKENS
max_count = 100
text_length = len(text)
while max_count > 0:
if text_length < max_token_count:
return await self._get_summary(text=text, max_words=max_words, keep_language=keep_language)
async def get_summary(self, text: str, max_words=200, keep_language: bool = False, **kwargs) -> str:
from metagpt.memory.brain_memory import BrainMemory
padding_size = 20 if max_token_count > 20 else 0
text_windows = self.split_texts(text, window_size=max_token_count - padding_size)
part_max_words = min(int(max_words / len(text_windows)) + 1, 100)
summaries = []
for ws in text_windows:
response = await self._get_summary(text=ws, max_words=part_max_words, keep_language=keep_language)
summaries.append(response)
if len(summaries) == 1:
return summaries[0]
# Merged and retry
text = "\n".join(summaries)
text_length = len(text)
max_count -= 1 # safeguard
raise openai.error.InvalidRequestError("text too long")
async def _get_summary(self, text: str, max_words=20, keep_language: bool = False):
"""Generate text summary"""
if len(text) < max_words:
return text
if keep_language:
command = f".Translate the above content into a summary of less than {max_words} words in language of the content strictly."
else:
command = f"Translate the above content into a summary of less than {max_words} words."
msg = text + "\n\n" + command
logger.debug(f"summary ask:{msg}")
response = await self.aask(msg=msg, system_msgs=[])
logger.debug(f"summary rsp: {response}")
return response
async def get_context_title(self, text: str, max_words=5) -> str:
"""Generate text title"""
summary = await self.get_summary(text, max_words=500)
language = CONFIG.language or DEFAULT_LANGUAGE
command = f"Translate the above summary into a {language} title of less than {max_words} words."
summaries = [summary, command]
msg = "\n".join(summaries)
logger.debug(f"title ask:{msg}")
response = await self.aask(msg=msg, system_msgs=[])
logger.debug(f"title rsp: {response}")
return response
async def is_related(self, text1, text2):
# command = f"{text1}\n{text2}\n\nIf the two sentences above are related, return [TRUE] brief and clear. Otherwise, return [FALSE]."
command = f"{text2}\n\nIs there any sentence above related to the following sentence: {text1}.\nIf is there any relevance, return [TRUE] brief and clear. Otherwise, return [FALSE] brief and clear."
rsp = await self.aask(msg=command, system_msgs=[])
result = True if "TRUE" in rsp else False
p2 = text2.replace("\n", "")
p1 = text1.replace("\n", "")
logger.info(f"IS_RELATED:\nParagraph 1: {p2}\nParagraph 2: {p1}\nRESULT: {result}\n")
return result
async def rewrite(self, sentence: str, context: str):
# command = (
# f"{context}\n\nConsidering the content above, rewrite and return this sentence brief and clear:\n{sentence}"
# )
command = f"{context}\n\nExtract relevant information from every preceding sentence and use it to succinctly supplement or rewrite the following text in brief and clear:\n{sentence}"
rsp = await self.aask(msg=command, system_msgs=[])
logger.info(f"REWRITE:\nCommand: {command}\nRESULT: {rsp}\n")
return rsp
@staticmethod
def split_texts(text: str, window_size) -> List[str]:
"""Splitting long text into sliding windows text"""
if window_size <= 0:
window_size = OpenAIGPTAPI.DEFAULT_TOKEN_SIZE
total_len = len(text)
if total_len <= window_size:
return [text]
padding_size = 20 if window_size > 20 else 0
windows = []
idx = 0
data_len = window_size - padding_size
while idx < total_len:
if window_size + idx > total_len: # 不足一个滑窗
windows.append(text[idx:])
break
# 每个窗口少算padding_size自然就可实现滑窗功能, 比如: [1, 2, 3, 4, 5, 6, 7, ....]
# window_size=3, padding_size=1
# [1, 2, 3], [3, 4, 5], [5, 6, 7], ....
# idx=2, | idx=5 | idx=8 | ...
w = text[idx : idx + window_size]
windows.append(w)
idx += data_len
return windows
@staticmethod
def extract_info(input_string, pattern=r"\[([A-Z]+)\]:\s*(.+)"):
match = re.match(pattern, input_string)
if match:
return match.group(1), match.group(2)
else:
return None, input_string
@staticmethod
async def async_retry_call(func, *args, **kwargs):
for i in range(OpenAIGPTAPI.MAX_TRY):
try:
rsp = await func(*args, **kwargs)
return rsp
except openai.error.RateLimitError as e:
random_time = random.uniform(0, 3) # 生成0到5秒之间的随机时间
rounded_time = round(random_time, 1) # 保留一位小数以实现0.1秒的精度
logger.warning(f"Exception:{e}, sleeping for {rounded_time} seconds")
await asyncio.sleep(rounded_time)
continue
except Exception as e:
error_str = traceback.format_exc()
logger.error(f"Exception:{e}, stack:{error_str}")
raise e
raise openai.error.OpenAIError("Exceeds the maximum retries")
@staticmethod
def retry_call(func, *args, **kwargs):
for i in range(OpenAIGPTAPI.MAX_TRY):
try:
rsp = func(*args, **kwargs)
return rsp
except openai.error.RateLimitError as e:
logger.warning(f"Exception:{e}")
continue
except (
openai.error.AuthenticationError,
openai.error.PermissionError,
openai.error.InvalidAPIType,
openai.error.SignatureVerificationError,
) as e:
logger.warning(f"Exception:{e}")
raise e
except Exception as e:
error_str = traceback.format_exc()
logger.error(f"Exception:{e}, stack:{error_str}")
raise e
raise openai.error.OpenAIError("Exceeds the maximum retries")
MAX_TRY = 5
DEFAULT_TOKEN_SIZE = 500
if __name__ == "__main__":
txt = """
as dfas sad lkf sdkl sakdfsdk sjd jsk sdl sk dd sd asd fa sdf sad dd
- .gitlab-ci.yml & base_test.py
"""
OpenAIGPTAPI.split_texts(txt, 30)
memory = BrainMemory(llm_type=LLMType.OPENAI.value, historical_summary=text, cacheable=False)
return await memory.summarize(llm=self, max_length=max_words, keep_language=keep_language)

View file

@ -45,7 +45,7 @@ class Assistant(Role):
name=name, profile=profile, goal=goal, constraints=constraints, desc=desc, *args, **kwargs
)
brain_memory = CONFIG.BRAIN_MEMORY
self.memory = BrainMemory(**brain_memory) if brain_memory else BrainMemory()
self.memory = BrainMemory(**brain_memory) if brain_memory else BrainMemory(llm_type=CONFIG.LLM_TYPE)
skill_path = Path(CONFIG.SKILL_PATH) if CONFIG.SKILL_PATH else None
self.skills = SkillLoader(skill_yaml_file_name=skill_path)
@ -83,7 +83,7 @@ class Assistant(Role):
self.memory.add_talk(Message(content=text))
async def _plan(self, rsp: str, **kwargs) -> bool:
skill, text = Assistant.extract_info(input_string=rsp)
skill, text = BrainMemory.extract_info(input_string=rsp)
handlers = {
MessageType.Talk.value: self.talk_handler,
MessageType.Skill.value: self.skill_handler,
@ -115,28 +115,19 @@ class Assistant(Role):
return True
async def refine_memory(self) -> str:
history_text = self.memory.history_text
last_talk = self.memory.pop_last_talk()
if last_talk is None: # No user feedback, unsure if past conversation is finished.
return None
if history_text == "":
if not self.memory.is_history_available:
return last_talk
history_summary = await self._llm.get_summary(history_text, max_words=800, keep_language=True)
await self.memory.set_history_summary(
history_summary=history_summary, redis_key=CONFIG.REDIS_KEY, redis_conf=CONFIG.REDIS
)
if last_talk and await self._llm.is_related(last_talk, history_summary): # Merge relevant content.
last_talk = await self._llm.rewrite(sentence=last_talk, context=history_text)
history_summary = await self.memory.summarize(max_words=800, keep_language=True, llm=self._llm)
if last_talk and await self.memory.is_related(text1=last_talk, text2=history_summary, llm=self._llm):
# Merge relevant content.
last_talk = await self.memory.rewrite(sentence=last_talk, context=history_summary, llm=self._llm)
return last_talk
return last_talk
@staticmethod
def extract_info(input_string):
from metagpt.provider.openai_api import OpenAIGPTAPI
return OpenAIGPTAPI.extract_info(input_string)
def get_memory(self) -> str:
return self.memory.json()

View file

@ -16,7 +16,7 @@ from pydantic import BaseModel, Field
from metagpt.actions import Action, ActionOutput
from metagpt.config import CONFIG
from metagpt.const import OPTIONS
from metagpt.llm import LLM
from metagpt.llm import LLMFactory
from metagpt.logs import logger
from metagpt.memory import LongTermMemory, Memory
from metagpt.schema import Message, MessageTag
@ -113,7 +113,7 @@ class Role:
constraints = Role.format_value(constraints)
desc = Role.format_value(desc)
self._llm = LLM()
self._llm = LLMFactory.new_llm()
self._setting = RoleSetting(name=name, profile=profile, goal=goal, constraints=constraints, desc=desc)
self._states = []
self._actions = []

View file

@ -76,6 +76,7 @@ class Message:
"sent_from": self.sent_from,
"send_to": self.send_to,
"tags": self.tags,
"id": self.id,
}
m = {"content": self.content}