MetaGPT/metagpt/environment.py
geekan 331d74059f 1. 动作优化
1. SummarizeCode动作:用于基于代码进行总结,思考bug、逻辑、todo
  2. CodeReview动作优化:目前强制要求回答问题,有更高的成功率了
2. 数据结构
  1. Document的标准化:Env->Repo->Document,其中Document/Asset/Code都只用Document
    1. 原用于检索的Document改为IndexableDocument
  2. Repo结构引入:用于Document装载与元数据装载
  3. RepoParser引入:写了一个简单的AST parser(后续可能要换tree-sitter),给出了整库symbol
3. 配置优化
  1. 默认更换为gpt-4-1106-preview,以获得最好的效果与成本
  2. 提供~/.metagpt作为配置最高优先级目录,从中读取config.yaml
  3. workspace可以灵活指定了,在config中配置
4. metagpt作为默认命令行,而非python startup.py
  1. 使用新的METAGPT_ROOT生成方式,而非寻找git,以便cli安装
  2. 命令行由fire换为了typer,它会带来相对更好的体验
  3. project_name可以灵活指定了,在metagpt命令行输入中配置
5. 其他
  1. BossRequirement -> UserRequirement
  2. 大量错误文本的修正,增加了可读性
  3. 中量提示词优化,稍微提升了一些准确率
  4. 暂时屏蔽了LongtermMemory相关逻辑,这个逻辑底层调用了langchain的FAISS,会带来~5秒加载耗时
  5. 修复了安装包中的部分描述错误
2023-11-27 15:47:06 +08:00

85 lines
2.5 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 22:12
@Author : alexanderwu
@File : environment.py
"""
import asyncio
from typing import Iterable
from pydantic import BaseModel, Field
# from metagpt.document import Document
from metagpt.document import Repo
from metagpt.memory import Memory
from metagpt.roles import Role
from metagpt.schema import Message
class Environment(BaseModel):
"""
Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles
"""
roles: dict[str, Role] = Field(default_factory=dict)
memory: Memory = Field(default_factory=Memory)
history: str = Field(default='')
repo: Repo = Field(default_factory=Repo)
class Config:
arbitrary_types_allowed = True
def add_role(self, role: Role):
"""增加一个在当前环境的角色
Add a role in the current environment
"""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的角色
Add a batch of characters in the current environment
"""
for role in roles:
self.add_role(role)
def publish_message(self, message: Message):
"""向当前环境发布信息
Post information to the current environment
"""
# self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
def publish_doc(self, content: str, filename: str):
"""向当前环境发布文档(包括代码)"""
self.repo.set(content, filename)
async def run(self, k=1):
"""处理一次所有信息的运行
Process all Role runs at once
"""
# while not self.message_queue.empty():
# message = self.message_queue.get()
# rsp = await self.manager.handle(message, self)
# self.message_queue.put(rsp)
for _ in range(k):
futures = []
for role in self.roles.values():
future = role.run()
futures.append(future)
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""获得环境内的所有角色
Process all Role runs at once
"""
return self.roles
def get_role(self, name: str) -> Role:
"""获得环境内的指定角色
get all the environment roles
"""
return self.roles.get(name, None)