mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-04-30 19:36:24 +02:00
refactor: Refactor Message transmission & filtering
This commit is contained in:
parent
5e8ada5cff
commit
545d77ce0d
30 changed files with 658 additions and 296 deletions
|
|
@ -8,12 +8,20 @@
|
|||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from asyncio import Queue, QueueEmpty, wait_for
|
||||
from json import JSONDecodeError
|
||||
from typing import Dict, List, TypedDict
|
||||
from typing import Dict, List, Set, TypedDict
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from metagpt.const import (
|
||||
MESSAGE_META_ROLE,
|
||||
MESSAGE_ROUTE_CAUSE_BY,
|
||||
MESSAGE_ROUTE_FROM,
|
||||
MESSAGE_ROUTE_TO,
|
||||
)
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
|
|
@ -22,44 +30,150 @@ class RawMessage(TypedDict):
|
|||
role: str
|
||||
|
||||
|
||||
class Routes(BaseModel):
|
||||
"""Responsible for managing routing information for the Message class."""
|
||||
|
||||
routes: List[Dict] = Field(default_factory=list)
|
||||
|
||||
def set_from(self, value):
|
||||
"""Set the label of the message sender."""
|
||||
route = self._get_route()
|
||||
route[MESSAGE_ROUTE_FROM] = value
|
||||
|
||||
def set_to(self, tags: Set):
|
||||
"""Set the labels of the message recipient."""
|
||||
route = self._get_route()
|
||||
if tags:
|
||||
route[MESSAGE_ROUTE_TO] = tags
|
||||
return
|
||||
|
||||
if MESSAGE_ROUTE_TO in route:
|
||||
del route[MESSAGE_ROUTE_TO]
|
||||
|
||||
def add_to(self, tag: str):
|
||||
"""Add a label of the message recipient."""
|
||||
route = self._get_route()
|
||||
tags = route.get(MESSAGE_ROUTE_TO, set())
|
||||
tags.add(tag)
|
||||
route[MESSAGE_ROUTE_TO] = tags
|
||||
|
||||
def _get_route(self) -> Dict:
|
||||
if not self.routes:
|
||||
self.routes.append({})
|
||||
return self.routes[0]
|
||||
|
||||
def is_recipient(self, tags: Set) -> bool:
|
||||
"""Check if it is the message recipient."""
|
||||
route = self._get_route()
|
||||
to_tags = route.get(MESSAGE_ROUTE_TO)
|
||||
if not to_tags:
|
||||
return True
|
||||
|
||||
for k in tags:
|
||||
if k in to_tags:
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def tx_from(self):
|
||||
"""Message route info tells who sent this message."""
|
||||
route = self._get_route()
|
||||
return route.get(MESSAGE_ROUTE_FROM)
|
||||
|
||||
@property
|
||||
def tx_to(self):
|
||||
"""Labels for the consumer to filter its subscribed messages."""
|
||||
route = self._get_route()
|
||||
return route.get(MESSAGE_ROUTE_TO)
|
||||
|
||||
|
||||
class Message(BaseModel):
|
||||
"""list[<role>: <content>]"""
|
||||
|
||||
content: str
|
||||
instruct_content: BaseModel = None
|
||||
meta_info: Dict = Field(default_factory=dict)
|
||||
route: List[Dict] = Field(default_factory=list)
|
||||
route: Routes = Field(default_factory=Routes)
|
||||
|
||||
def __init__(self, content, **kwargs):
|
||||
"""
|
||||
:param content: Message content.
|
||||
:param instruct_content: Message content struct.
|
||||
:param meta_info: Message meta info.
|
||||
:param route: Message route configuration.
|
||||
:param tx_from: Message route info tells who sent this message.
|
||||
:param tx_to: Labels for the consumer to filter its subscribed messages.
|
||||
:param cause_by: Labels for the consumer to filter its subscribed messages, also serving as meta info.
|
||||
:param role: Message meta info tells who sent this message.
|
||||
"""
|
||||
super(Message, self).__init__(
|
||||
content=content or kwargs.get("content"),
|
||||
instruct_content=kwargs.get("instruct_content"),
|
||||
meta_info=kwargs.get("meta_info", {}),
|
||||
route=kwargs.get("route", []),
|
||||
route=kwargs.get("route", Routes()),
|
||||
)
|
||||
|
||||
attribute_names = Message.__annotations__.keys()
|
||||
for k, v in kwargs.items():
|
||||
if k in attribute_names:
|
||||
continue
|
||||
if k == MESSAGE_ROUTE_FROM:
|
||||
self.set_from(v)
|
||||
continue
|
||||
if k == MESSAGE_ROUTE_CAUSE_BY:
|
||||
self.meta_info[k] = v
|
||||
if k == MESSAGE_ROUTE_TO or k == MESSAGE_ROUTE_CAUSE_BY:
|
||||
self.add_to(v)
|
||||
continue
|
||||
self.meta_info[k] = v
|
||||
|
||||
def get_meta(self, key):
|
||||
"""Get meta info"""
|
||||
return self.meta_info.get(key)
|
||||
|
||||
def set_meta(self, key, value):
|
||||
"""Set meta info"""
|
||||
self.meta_info[key] = value
|
||||
|
||||
@property
|
||||
def role(self):
|
||||
return self.get_meta("role")
|
||||
"""Message meta info tells who sent this message."""
|
||||
return self.get_meta(MESSAGE_META_ROLE)
|
||||
|
||||
@property
|
||||
def cause_by(self):
|
||||
return self.get_meta("cause_by")
|
||||
"""Labels for the consumer to filter its subscribed messages, also serving as meta info."""
|
||||
return self.get_meta(MESSAGE_ROUTE_CAUSE_BY)
|
||||
|
||||
@property
|
||||
def tx_from(self):
|
||||
"""Message route info tells who sent this message."""
|
||||
return self.route.tx_from
|
||||
|
||||
@property
|
||||
def tx_to(self):
|
||||
"""Labels for the consumer to filter its subscribed messages."""
|
||||
return self.route.tx_to
|
||||
|
||||
def set_role(self, v):
|
||||
self.set_meta("role", v)
|
||||
"""Set the message's meta info indicating the sender."""
|
||||
self.set_meta(MESSAGE_META_ROLE, v)
|
||||
|
||||
def set_from(self, v):
|
||||
"""Set the message's meta info indicating the sender."""
|
||||
self.route.set_from(v)
|
||||
|
||||
def set_to(self, tags: Set):
|
||||
"""Set the message's meta info indicating the sender."""
|
||||
self.route.set_to(tags)
|
||||
|
||||
def add_to(self, tag: str):
|
||||
"""Add a subscription label for the recipients."""
|
||||
self.route.add_to(tag)
|
||||
|
||||
def is_recipient(self, tags: Set):
|
||||
"""Return true if any input label exists in the message's subscription labels."""
|
||||
return self.route.is_recipient(tags)
|
||||
|
||||
def __str__(self):
|
||||
# prefix = '-'.join([self.role, str(self.cause_by)])
|
||||
|
|
@ -69,13 +183,16 @@ class Message(BaseModel):
|
|||
return self.__str__()
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Return a dict containing `role` and `content` for the LLM call.l"""
|
||||
return {"role": self.role, "content": self.content}
|
||||
|
||||
def save(self) -> str:
|
||||
"""Convert the object to json string"""
|
||||
return self.json(exclude_none=True)
|
||||
|
||||
@staticmethod
|
||||
def load(v):
|
||||
"""Convert the json string to object."""
|
||||
try:
|
||||
d = json.loads(v)
|
||||
return Message(**d)
|
||||
|
|
@ -90,7 +207,7 @@ class UserMessage(Message):
|
|||
"""
|
||||
|
||||
def __init__(self, content: str):
|
||||
super(Message, self).__init__(content=content, meta_info={"role": "user"})
|
||||
super().__init__(content=content, role="user")
|
||||
|
||||
|
||||
class SystemMessage(Message):
|
||||
|
|
@ -99,7 +216,7 @@ class SystemMessage(Message):
|
|||
"""
|
||||
|
||||
def __init__(self, content: str):
|
||||
super().__init__(content=content, meta_info={"role": "system"})
|
||||
super().__init__(content=content, role="system")
|
||||
|
||||
|
||||
class AIMessage(Message):
|
||||
|
|
@ -108,7 +225,65 @@ class AIMessage(Message):
|
|||
"""
|
||||
|
||||
def __init__(self, content: str):
|
||||
super().__init__(content=content, meta_info={"role": "assistant"})
|
||||
super().__init__(content=content, role="assistant")
|
||||
|
||||
|
||||
class MessageQueue:
|
||||
def __init__(self):
|
||||
self._queue = Queue()
|
||||
|
||||
def pop(self) -> Message | None:
|
||||
try:
|
||||
item = self._queue.get_nowait()
|
||||
if item:
|
||||
self._queue.task_done()
|
||||
return item
|
||||
except QueueEmpty:
|
||||
return None
|
||||
|
||||
def pop_all(self) -> List[Message]:
|
||||
ret = []
|
||||
while True:
|
||||
msg = self.pop()
|
||||
if not msg:
|
||||
break
|
||||
ret.append(msg)
|
||||
return ret
|
||||
|
||||
def push(self, msg: Message):
|
||||
self._queue.put_nowait(msg)
|
||||
|
||||
def empty(self):
|
||||
return self._queue.empty()
|
||||
|
||||
async def save(self) -> str:
|
||||
if self.empty():
|
||||
return "[]"
|
||||
|
||||
lst = []
|
||||
try:
|
||||
while True:
|
||||
item = await wait_for(self._queue.get(), timeout=1.0)
|
||||
if item is None:
|
||||
break
|
||||
lst.append(item.dict(exclude_none=True))
|
||||
self._queue.task_done()
|
||||
except asyncio.TimeoutError:
|
||||
logger.debug("Queue is empty, exiting...")
|
||||
return json.dumps(lst)
|
||||
|
||||
@staticmethod
|
||||
def load(self, v) -> "MessageQueue":
|
||||
q = MessageQueue()
|
||||
try:
|
||||
lst = json.loads(v)
|
||||
for i in lst:
|
||||
msg = Message(**i)
|
||||
q.push(msg)
|
||||
except JSONDecodeError as e:
|
||||
logger.warning(f"JSON load failed: {v}, error:{e}")
|
||||
|
||||
return q
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue