mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-04-26 01:06:27 +02:00
66 lines
1.9 KiB
Python
66 lines
1.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
@Time : 2023/5/11 22:12
|
|
@Author : alexanderwu
|
|
@File : environment.py
|
|
"""
|
|
import asyncio
|
|
from queue import Queue
|
|
from typing import Iterable
|
|
|
|
from metagpt.manager import Manager
|
|
from metagpt.roles import Role
|
|
from metagpt.schema import Message
|
|
from metagpt.memory import Memory
|
|
|
|
|
|
class Environment:
|
|
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到"""
|
|
def __init__(self):
|
|
self.roles: dict[str, Role] = {}
|
|
# self.message_queue = Queue()
|
|
self.memory = Memory()
|
|
self.history = ''
|
|
|
|
def add_role(self, role: Role):
|
|
"""增加一个在当前环境的Role"""
|
|
role.set_env(self)
|
|
self.roles[role.profile] = role
|
|
|
|
def add_roles(self, roles: Iterable[Role]):
|
|
"""增加一批在当前环境的Role"""
|
|
for role in roles:
|
|
self.add_role(role)
|
|
|
|
def set_manager(self, manager):
|
|
"""设置一个当前环境的管理员"""
|
|
self.manager = manager
|
|
|
|
def publish_message(self, message: Message):
|
|
"""向当前环境发布信息"""
|
|
# self.message_queue.put(message)
|
|
self.memory.add(message)
|
|
self.history += f"\n{message}"
|
|
|
|
async def run(self, k=1):
|
|
"""处理一次所有Role的运行"""
|
|
# while not self.message_queue.empty():
|
|
# message = self.message_queue.get()
|
|
# rsp = await self.manager.handle(message, self)
|
|
# self.message_queue.put(rsp)
|
|
for _ in range(k):
|
|
futures = []
|
|
for role in self.roles.values():
|
|
future = role.run()
|
|
futures.append(future)
|
|
|
|
await asyncio.gather(*futures)
|
|
|
|
def get_roles(self) -> dict[str, Role]:
|
|
"""获得环境内的所有Role"""
|
|
return self.roles
|
|
|
|
def get_role(self, name: str) -> Role:
|
|
"""获得环境内的指定Role"""
|
|
return self.roles.get(name, None)
|