#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2023/5/11 22:12 @Author : alexanderwu @File : environment.py """ import asyncio from typing import Iterable from pathlib import Path from pydantic import BaseModel, Field from metagpt.memory import Memory from metagpt.roles import Role from metagpt.schema import Message from metagpt.utils.utils import read_json_file, write_json_file class Environment(BaseModel): """环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到 Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles """ roles: dict[str, Role] = Field(default_factory=dict) memory: Memory = Field(default_factory=Memory) history: str = Field(default='') class Config: arbitrary_types_allowed = True def serialize(self, stg_path: Path): roles_path = stg_path.joinpath("roles.json") roles_info = [] for role_key, role in self.roles.items(): roles_info.append({ "role_class": role.__class__.__name__, "module_name": role.__module__, "role_name": role.name }) role.serialize(stg_path=stg_path.joinpath(f"roles/{role.__class__.__name__}_{role.name}")) write_json_file(roles_path, roles_info) self.memory.serialize(stg_path) history_path = stg_path.joinpath("history.json") write_json_file(history_path, {"content": self.history}) def deserialize(self, stg_path: Path): """ stg_path: ./storage/team/environment/ """ roles_path = stg_path.joinpath("roles.json") roles_info = read_json_file(roles_path) for role_info in roles_info: role_class = role_info.get("role_class") role_name = role_info.get("role_name") role_path = stg_path.joinpath(f"roles/{role_class}_{role_name}") role = Role.deserialize(role_path) self.add_role(role) memory = Memory.deserialize(stg_path) self.memory = memory history_path = stg_path.joinpath("history.json") history = read_json_file(history_path) self.history = history.get("content") def add_role(self, role: Role): """增加一个在当前环境的角色, 默认为profile/role_profile Add a role in the current environment """ role.set_env(self) # use alias self.roles[role.role_profile] = role def add_roles(self, roles: Iterable[Role]): """增加一批在当前环境的角色 Add a batch of characters in the current environment """ for role in roles: self.add_role(role) def publish_message(self, message: Message): """向当前环境发布信息 Post information to the current environment """ # self.message_queue.put(message) self.memory.add(message) self.history += f"\n{message}" async def run(self, k=1): """处理一次所有信息的运行 Process all Role runs at once """ # while not self.message_queue.empty(): # message = self.message_queue.get() # rsp = await self.manager.handle(message, self) # self.message_queue.put(rsp) for _ in range(k): futures = [] for role in self.roles.values(): future = role.run() futures.append(future) await asyncio.gather(*futures) def get_roles(self) -> dict[str, Role]: """获得环境内的所有角色 Process all Role runs at once """ return self.roles def get_role(self, name: str) -> Role: """获得环境内的指定角色 get all the environment roles """ return self.roles.get(name, None)