MetaGPT/metagpt/environment.py
2023-08-02 15:57:10 -05:00

64 lines
1.9 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 22:12
@Author : alexanderwu
@File : environment.py
"""
import asyncio
from typing import Iterable
from pydantic import BaseModel, Field
from metagpt.memory import Memory
from metagpt.roles import Role
from metagpt.schema import Message
class Environment(BaseModel):
"""Environment that carries a set of roles. Roles can publish messages to the environment, which can be observed by other roles."""
roles: dict[str, Role] = Field(default_factory=dict)
memory: Memory = Field(default_factory=Memory)
history: str = Field(default='')
class Config:
arbitrary_types_allowed = True
def add_role(self, role: Role):
"""Add a Role to the current environment."""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""Add a batch of Roles to the current environment."""
for role in roles:
self.add_role(role)
def publish_message(self, message: Message):
"""Publish a message to the current environment."""
# self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
async def run(self, k=1):
"""Process the run of all Roles once."""
# while not self.message_queue.empty():
# message = self.message_queue.get()
# rsp = await self.manager.handle(message, self)
# self.message_queue.put(rsp)
for _ in range(k):
futures = []
for role in self.roles.values():
future = role.run()
futures.append(future)
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""Get all Roles within the environment."""
return self.roles
def get_role(self, name: str) -> Role:
"""Get a specified Role within the environment."""
return self.roles.get(name, None)