feat: merge main

This commit is contained in:
莘权 马 2024-03-29 11:20:10 +08:00
commit 33ca44739d
199 changed files with 7620 additions and 469 deletions

View file

@ -19,6 +19,7 @@
- LLM type and model name:
- System version:
- Python version:
- MetaGPT version or branch:
<!-- Dependent packagessthe packages version cause the bug(like `pydantic 1.10.8`), installation methodlike `pip install metagpt` or `pip install from source` or `run in docker` -->

3
MANIFEST.in Normal file
View file

@ -0,0 +1,3 @@
recursive-include metagpt/ext/stanford_town/prompts *.txt
recursive-include metagpt/ext/stanford_town/static_dirs *.csv
recursive-include metagpt/ext/stanford_town/static_dirs *.json

View file

@ -0,0 +1,93 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : entry of Stanford Town(ST/st) game
import asyncio
from typing import Optional
import fire
from metagpt.ext.stanford_town.roles.st_role import STRole
from metagpt.ext.stanford_town.stanford_town import StanfordTown
from metagpt.ext.stanford_town.utils.const import STORAGE_PATH
from metagpt.ext.stanford_town.utils.mg_ga_transform import (
get_reverie_meta,
write_curr_sim_code,
write_curr_step,
)
from metagpt.ext.stanford_town.utils.utils import copy_folder
from metagpt.logs import logger
async def startup(
idea: str, fork_sim_code: str, sim_code: str, temp_storage_path: str, investment: float = 30.0, n_round: int = 500
):
town = StanfordTown()
logger.info("StanfordTown init environment")
# copy `storage/{fork_sim_code}` to `storage/{sim_code}`
copy_folder(str(STORAGE_PATH.joinpath(fork_sim_code)), str(STORAGE_PATH.joinpath(sim_code)))
# get role names from `storage/{simulation_name}/reverie/meta.json` and then init roles
reverie_meta = get_reverie_meta(fork_sim_code)
roles = []
sim_path = STORAGE_PATH.joinpath(sim_code)
sim_path.mkdir(exist_ok=True)
for idx, role_name in enumerate(reverie_meta["persona_names"]):
has_inner_voice = True if idx == 0 else False
role = STRole(
name=role_name,
profile=role_name,
sim_code=sim_code,
step=reverie_meta.get("step", 0),
start_time=reverie_meta.get("start_date"),
curr_time=reverie_meta.get("curr_time"),
sec_per_step=reverie_meta.get("sec_per_step"),
has_inner_voice=has_inner_voice,
)
roles.append(role)
# init temp_storage
write_curr_sim_code({"sim_code": sim_code}, temp_storage_path)
write_curr_step({"step": reverie_meta.get("step", 0)}, temp_storage_path)
await town.hire(roles)
town.invest(investment)
town.run_project(idea)
await town.run(n_round)
def main(
idea: str,
fork_sim_code: str,
sim_code: str,
temp_storage_path: Optional[str] = None,
investment: float = 30.0,
n_round: int = 500,
):
"""
Args:
idea: idea works as an `inner voice` to the first agent.
fork_sim_code: old simulation name to start with, choose one inside `generative_agents/environment/frontend_server/storage/`
sim_code: new simulation name to save simulation result
temp_storage_path: generative_agents temp_storage path inside `environment/frontend_server` to interact.
investment: the investment of running agents
n_round: rounds to run agents
"""
asyncio.run(
startup(
idea=idea,
fork_sim_code=fork_sim_code,
sim_code=sim_code,
temp_storage_path=temp_storage_path,
investment=investment,
n_round=n_round,
)
)
if __name__ == "__main__":
fire.Fire(main)

View file

@ -0,0 +1,4 @@
# path to store simulation data
test_*
unittest*
July*

View file

@ -0,0 +1,26 @@
{
"Isabella Rodriguez": {
"maze": "the_ville",
"x": 72,
"y": 14
},
"Klaus Mueller": {
"maze": "the_ville",
"x": 126,
"y": 46
},
"Maria Lopez": {
"maze": "the_ville",
"x": 123,
"y": 57
}
}

View file

@ -0,0 +1,51 @@
{
"vision_r": 8,
"att_bandwidth": 8,
"retention": 8,
"curr_time": null,
"curr_tile": null,
"daily_plan_req": "Isabella Rodriguez opens Hobbs Cafe at 8am everyday, and works at the counter until 8pm, at which point she closes the cafe.",
"name": "Isabella Rodriguez",
"first_name": "Isabella",
"last_name": "Rodriguez",
"age": 34,
"innate": "friendly, outgoing, hospitable",
"learned": "Isabella Rodriguez is a cafe owner of Hobbs Cafe who loves to make people feel welcome. She is always looking for ways to make the cafe a place where people can come to relax and enjoy themselves.",
"currently": "Isabella Rodriguez is planning on having a Valentine's Day party at Hobbs Cafe with her customers on February 14th, 2023 at 5pm. She is gathering party material, and is telling everyone to join the party at Hobbs Cafe on February 14th, 2023, from 5pm to 7pm.",
"lifestyle": "Isabella Rodriguez goes to bed around 11pm, awakes up around 6am.",
"living_area": "the Ville:Isabella Rodriguez's apartment:main room",
"concept_forget": 100,
"daily_reflection_time": 180,
"daily_reflection_size": 5,
"overlap_reflect_th": 4,
"kw_strg_event_reflect_th": 10,
"kw_strg_thought_reflect_th": 9,
"recency_w": 1,
"relevance_w": 1,
"importance_w": 1,
"recency_decay": 0.995,
"importance_trigger_max": 150,
"importance_trigger_curr": 150,
"importance_ele_n": 0,
"thought_count": 5,
"daily_req": [],
"f_daily_schedule": [],
"f_daily_schedule_hourly_org": [],
"act_address": null,
"act_start_time": null,
"act_duration": null,
"act_description": null,
"act_pronunciatio": null,
"act_event": ["Isabella Rodriguez", null, null],
"act_obj_description": null,
"act_obj_pronunciatio": null,
"act_obj_event": [null, null, null],
"chatting_with": null,
"chat": null,
"chatting_with_buffer": {},
"chatting_end_time": null,
"act_path_set": false,
"planned_path": []
}

View file

@ -0,0 +1,66 @@
{
"the Ville": {
"Hobbs Cafe": {
"cafe": [
"refrigerator",
"cafe customer seating",
"cooking area",
"kitchen sink",
"behind the cafe counter",
"piano"
]
},
"Isabella Rodriguez's apartment": {
"main room": [
"bed",
"desk",
"refrigerator",
"closet",
"shelf"
]
},
"The Rose and Crown Pub": {
"pub": [
"shelf",
"refrigerator",
"bar customer seating",
"behind the bar counter",
"kitchen sink",
"cooking area",
"microphone"
]
},
"Harvey Oak Supply Store": {
"supply store": [
"supply store product shelf",
"behind the supply store counter",
"supply store counter"
]
},
"The Willows Market and Pharmacy": {
"store": [
"behind the pharmacy counter",
"pharmacy store shelf",
"pharmacy store counter",
"grocery store shelf",
"behind the grocery counter",
"grocery store counter"
]
},
"Dorm for Oak Hill College": {
"garden": [
"dorm garden"
],
"common room": [
"common room sofa",
"pool table",
"common room table"
]
},
"Johnson Park": {
"park": [
"park garden"
]
}
}
}

View file

@ -0,0 +1,2 @@
{"kw_strength_event": {},
"kw_strength_thought": {}}

View file

@ -0,0 +1,51 @@
{
"vision_r": 8,
"att_bandwidth": 8,
"retention": 8,
"curr_time": null,
"curr_tile": null,
"daily_plan_req": "Klaus Mueller goes to the library at Oak Hill College early in the morning, spends his days writing, and eats at Hobbs Cafe.",
"name": "Klaus Mueller",
"first_name": "Klaus",
"last_name": "Mueller",
"age": 20,
"innate": "kind, inquisitive, passionate",
"learned": "Klaus Mueller is a student at Oak Hill College studying sociology. He is passionate about social justice and loves to explore different perspectives.",
"currently": "Klaus Mueller is writing a research paper on the effects of gentrification in low-income communities.",
"lifestyle": "Klaus Mueller goes to bed around 11pm, awakes up around 7am, eats dinner around 5pm.",
"living_area": "the Ville:Dorm for Oak Hill College:Klaus Mueller's room",
"concept_forget": 100,
"daily_reflection_time": 180,
"daily_reflection_size": 5,
"overlap_reflect_th": 4,
"kw_strg_event_reflect_th": 10,
"kw_strg_thought_reflect_th": 9,
"recency_w": 1,
"relevance_w": 1,
"importance_w": 1,
"recency_decay": 0.99,
"importance_trigger_max": 150,
"importance_trigger_curr": 150,
"importance_ele_n": 0,
"thought_count": 5,
"daily_req": [],
"f_daily_schedule": [],
"f_daily_schedule_hourly_org": [],
"act_address": null,
"act_start_time": null,
"act_duration": null,
"act_description": null,
"act_pronunciatio": null,
"act_event": ["Klaus Mueller", null, null],
"act_obj_description": null,
"act_obj_pronunciatio": null,
"act_obj_event": [null, null, null],
"chatting_with": null,
"chat": null,
"chatting_with_buffer": {},
"chatting_end_time": null,
"act_path_set": false,
"planned_path": []
}

View file

@ -0,0 +1,86 @@
{
"the Ville": {
"Oak Hill College": {
"hallway": [],
"library": [
"library sofa",
"library table",
"bookshelf"
],
"classroom": [
"blackboard",
"classroom podium",
"classroom student seating"
]
},
"Dorm for Oak Hill College": {
"garden": [
"dorm garden"
],
"Klaus Mueller's room": [
"bed",
"game console",
"closet",
"desk"
],
"woman's bathroom": [
"toilet",
"shower",
"bathroom sink"
],
"common room": [
"common room sofa",
"pool table",
"common room table"
],
"man's bathroom": [
"shower",
"bathroom sink",
"toilet"
]
},
"The Willows Market and Pharmacy": {
"store": [
"grocery store shelf",
"behind the grocery counter",
"grocery store counter",
"pharmacy store shelf",
"pharmacy store counter",
"behind the pharmacy counter"
]
},
"Harvey Oak Supply Store": {
"supply store": [
"supply store product shelf",
"behind the supply store counter",
"supply store counter"
]
},
"Johnson Park": {
"park": [
"park garden"
]
},
"The Rose and Crown Pub": {
"pub": [
"shelf",
"refrigerator",
"bar customer seating",
"behind the bar counter",
"kitchen sink",
"cooking area",
"microphone"
]
},
"Hobbs Cafe": {
"cafe": [
"refrigerator",
"cafe customer seating",
"cooking area",
"kitchen sink",
"behind the cafe counter",
"piano"
]
}
}
}

View file

@ -0,0 +1,2 @@
{"kw_strength_event": {},
"kw_strength_thought": {}}

View file

@ -0,0 +1,51 @@
{
"vision_r": 8,
"att_bandwidth": 8,
"retention": 8,
"curr_time": null,
"curr_tile": null,
"daily_plan_req": "Maria Lopez spends at least 3 hours a day Twitch streaming or gaming.",
"name": "Maria Lopez",
"first_name": "Maria",
"last_name": "Lopez",
"age": 21,
"innate": "energetic, enthusiastic, inquisitive",
"learned": "Maria Lopez is a student at Oak Hill College studying physics and a part time Twitch game streamer who loves to connect with people and explore new ideas.",
"currently": "Maria Lopez is working on her physics degree and streaming games on Twitch to make some extra money. She visits Hobbs Cafe for studying and eating just about everyday.",
"lifestyle": "Maria Lopez goes to bed around 2am, awakes up around 9am, eats dinner around 6pm. She likes to hang out at Hobbs Cafe if it's before 6pm.",
"living_area": "the Ville:Dorm for Oak Hill College:Maria Lopez's room",
"concept_forget": 100,
"daily_reflection_time": 180,
"daily_reflection_size": 5,
"overlap_reflect_th": 4,
"kw_strg_event_reflect_th": 10,
"kw_strg_thought_reflect_th": 9,
"recency_w": 1,
"relevance_w": 1,
"importance_w": 1,
"recency_decay": 0.99,
"importance_trigger_max": 150,
"importance_trigger_curr": 150,
"importance_ele_n": 0,
"thought_count": 5,
"daily_req": [],
"f_daily_schedule": [],
"f_daily_schedule_hourly_org": [],
"act_address": null,
"act_start_time": null,
"act_duration": null,
"act_description": null,
"act_pronunciatio": null,
"act_event": ["Maria Lopez", null, null],
"act_obj_description": null,
"act_obj_pronunciatio": null,
"act_obj_event": [null, null, null],
"chatting_with": null,
"chat": null,
"chatting_with_buffer": {},
"chatting_end_time": null,
"act_path_set": false,
"planned_path": []
}

View file

@ -0,0 +1,87 @@
{
"the Ville": {
"Oak Hill College": {
"hallway": [],
"library": [
"library sofa",
"library table",
"bookshelf"
],
"classroom": [
"blackboard",
"classroom podium",
"classroom student seating"
]
},
"Dorm for Oak Hill College": {
"garden": [
"dorm garden"
],
"Maria Lopez's room": [
"closet",
"desk",
"bed",
"computer",
"blackboard"
],
"woman's bathroom": [
"toilet",
"shower",
"bathroom sink"
],
"common room": [
"common room sofa",
"pool table",
"common room table"
],
"man's bathroom": [
"shower",
"bathroom sink",
"toilet"
]
},
"The Willows Market and Pharmacy": {
"store": [
"grocery store shelf",
"behind the grocery counter",
"grocery store counter",
"pharmacy store shelf",
"pharmacy store counter",
"behind the pharmacy counter"
]
},
"Harvey Oak Supply Store": {
"supply store": [
"supply store product shelf",
"behind the supply store counter",
"supply store counter"
]
},
"Johnson Park": {
"park": [
"park garden"
]
},
"The Rose and Crown Pub": {
"pub": [
"shelf",
"refrigerator",
"bar customer seating",
"behind the bar counter",
"kitchen sink",
"cooking area",
"microphone"
]
},
"Hobbs Cafe": {
"cafe": [
"refrigerator",
"cafe customer seating",
"cooking area",
"kitchen sink",
"behind the cafe counter",
"piano"
]
}
}
}

View file

@ -0,0 +1,13 @@
{
"fork_sim_code": "base_the_ville_isabella_maria_klaus",
"start_date": "February 13, 2023",
"curr_time": "February 13, 2023, 00:00:00",
"sec_per_step": 10,
"maze_name": "the_ville",
"persona_names": [
"Isabella Rodriguez",
"Maria Lopez",
"Klaus Mueller"
],
"step": 0
}

View file

@ -21,7 +21,7 @@ ## Usage
from metagpt.environment.api.env_api import EnvAPIAbstract
# get screenshot from ExtEnv
screenshot_path: Path = env.observe(
screenshot_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)

View file

@ -3,10 +3,10 @@
# @Desc :
from metagpt.environment.base_env import Environment
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.werewolf_env.werewolf_env import WerewolfEnv
from metagpt.environment.stanford_town_env.stanford_town_env import StanfordTownEnv
from metagpt.environment.software_env.software_env import SoftwareEnv
from metagpt.environment.android.android_env import AndroidEnv
from metagpt.environment.werewolf.werewolf_env import WerewolfEnv
from metagpt.environment.stanford_town.stanford_town_env import StanfordTownEnv
from metagpt.environment.software.software_env import SoftwareEnv
__all__ = ["AndroidEnv", "WerewolfEnv", "StanfordTownEnv", "SoftwareEnv", "Environment"]

View file

@ -4,7 +4,7 @@
from pydantic import Field
from metagpt.environment.android_env.android_ext_env import AndroidExtEnv
from metagpt.environment.android.android_ext_env import AndroidExtEnv
from metagpt.environment.base_env import Environment

View file

@ -8,8 +8,9 @@ from typing import Any, Optional
from pydantic import Field
from metagpt.environment.android_env.const import ADB_EXEC_FAIL
from metagpt.environment.android.const import ADB_EXEC_FAIL
from metagpt.environment.base_env import ExtEnv, mark_as_readable, mark_as_writeable
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
class AndroidExtEnv(ExtEnv):
@ -19,6 +20,20 @@ class AndroidExtEnv(ExtEnv):
width: int = Field(default=720, description="device screen width")
height: int = Field(default=1080, description="device screen height")
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
pass
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
pass
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
pass
def __init__(self, **data: Any):
super().__init__(**data)
if data.get("device_id"):

View file

@ -3,9 +3,12 @@
# @Desc : base env of executing environment
import asyncio
from abc import abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Set, Union
from gymnasium import spaces
from gymnasium.core import ActType, ObsType
from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator
from metagpt.context import Context
@ -14,6 +17,7 @@ from metagpt.environment.api.env_api import (
ReadAPIRegistry,
WriteAPIRegistry,
)
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.logs import logger
from metagpt.schema import Message
from metagpt.utils.common import get_function_schema, is_coroutine_func, is_send_to
@ -49,6 +53,11 @@ def mark_as_writeable(func):
class ExtEnv(BaseModel):
"""External Env to integrate actual game environment"""
model_config = ConfigDict(arbitrary_types_allowed=True)
action_space: spaces.Space[ActType] = Field(default_factory=spaces.Space, exclude=True)
observation_space: spaces.Space[ObsType] = Field(default_factory=spaces.Space, exclude=True)
def _check_api_exist(self, rw_api: Optional[str] = None):
if not rw_api:
raise ValueError(f"{rw_api} not exists")
@ -61,39 +70,56 @@ class ExtEnv(BaseModel):
else:
return env_write_api_registry.get_apis()
async def observe(self, env_action: Union[str, EnvAPIAbstract]):
async def read_from_api(self, env_action: Union[str, EnvAPIAbstract]):
"""get observation from particular api of ExtEnv"""
if isinstance(env_action, str):
read_api = env_read_api_registry.get(api_name=env_action)["func"]
self._check_api_exist(read_api)
if is_coroutine_func(read_api):
res = await read_api(self)
env_read_api = env_read_api_registry.get(api_name=env_action)["func"]
self._check_api_exist(env_read_api)
if is_coroutine_func(env_read_api):
res = await env_read_api(self)
else:
res = read_api(self)
res = env_read_api(self)
elif isinstance(env_action, EnvAPIAbstract):
read_api = env_read_api_registry.get(api_name=env_action.api_name)["func"]
self._check_api_exist(read_api)
if is_coroutine_func(read_api):
res = await read_api(self, *env_action.args, **env_action.kwargs)
env_read_api = env_read_api_registry.get(api_name=env_action.api_name)["func"]
self._check_api_exist(env_read_api)
if is_coroutine_func(env_read_api):
res = await env_read_api(self, *env_action.args, **env_action.kwargs)
else:
res = read_api(self, *env_action.args, **env_action.kwargs)
res = env_read_api(self, *env_action.args, **env_action.kwargs)
return res
async def step(self, env_action: Union[str, Message, EnvAPIAbstract, list[EnvAPIAbstract]]):
async def write_thru_api(self, env_action: Union[str, Message, EnvAPIAbstract, list[EnvAPIAbstract]]):
"""execute through particular api of ExtEnv"""
res = None
if isinstance(env_action, Message):
self.publish_message(env_action)
elif isinstance(env_action, EnvAPIAbstract):
write_api = env_write_api_registry.get(env_action.api_name)["func"]
self._check_api_exist(write_api)
if is_coroutine_func(write_api):
res = await write_api(self, *env_action.args, **env_action.kwargs)
env_write_api = env_write_api_registry.get(env_action.api_name)["func"]
self._check_api_exist(env_write_api)
if is_coroutine_func(env_write_api):
res = await env_write_api(self, *env_action.args, **env_action.kwargs)
else:
res = write_api(self, *env_action.args, **env_action.kwargs)
res = env_write_api(self, *env_action.args, **env_action.kwargs)
return res
@abstractmethod
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Implement this to get init observation"""
@abstractmethod
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
"""Implement this if you want to get partial observation from the env"""
@abstractmethod
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
"""Implement this to feed a action and then get new observation from the env"""
class Environment(ExtEnv):
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到
@ -108,6 +134,20 @@ class Environment(ExtEnv):
history: str = "" # For debug
context: Context = Field(default_factory=Context, exclude=True)
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
pass
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
pass
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
pass
@model_validator(mode="after")
def init_roles(self):
self.add_roles(self.roles.values())

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from enum import IntEnum
from pydantic import BaseModel, ConfigDict, Field
class BaseEnvActionType(IntEnum):
# # NONE = 0 # no action to run, just get observation
pass
class BaseEnvAction(BaseModel):
"""env action type and its related params of action functions/apis"""
model_config = ConfigDict(arbitrary_types_allowed=True)
action_type: int = Field(default=0, description="action type")
class BaseEnvObsType(IntEnum):
# # NONE = 0 # get whole observation from env
pass
class BaseEnvObsParams(BaseModel):
"""observation params for different EnvObsType to get its observe result"""
model_config = ConfigDict(arbitrary_types_allowed=True)
obs_type: int = Field(default=0, description="observation type")

View file

@ -8,14 +8,14 @@ import re
import time
from typing import Any, Iterable
from llama_index.vector_stores.chroma import ChromaVectorStore
from pydantic import ConfigDict, Field
from metagpt.config2 import config as CONFIG
from metagpt.environment.base_env import Environment
from metagpt.environment.minecraft_env.const import MC_CKPT_DIR
from metagpt.environment.minecraft_env.minecraft_ext_env import MinecraftExtEnv
from metagpt.environment.minecraft.const import MC_CKPT_DIR
from metagpt.environment.minecraft.minecraft_ext_env import MinecraftExtEnv
from metagpt.logs import logger
from metagpt.rag.vector_stores.chroma import ChromaVectorStore
from metagpt.utils.common import load_mc_skills_code, read_json_file, write_json_file
@ -282,7 +282,7 @@ class MinecraftEnv(Environment, MinecraftExtEnv):
position = event["status"]["position"]
blocks.append(block)
positions.append(position)
new_events = self.step(
new_events = self._step(
f"await givePlacedItemBack(bot, {json.dumps(blocks)}, {json.dumps(positions)})",
programs=self.programs,
)
@ -323,7 +323,7 @@ class MinecraftEnv(Environment, MinecraftExtEnv):
Exception: If there is an issue retrieving events.
"""
try:
self.reset(
self._reset(
options={
"mode": "soft",
"wait_ticks": 20,
@ -332,13 +332,13 @@ class MinecraftEnv(Environment, MinecraftExtEnv):
# difficulty = "easy" if len(self.completed_tasks) > 15 else "peaceful"
difficulty = "peaceful"
events = self.step("bot.chat(`/time set ${getNextTime()}`);\n" + f"bot.chat('/difficulty {difficulty}');")
events = self._step("bot.chat(`/time set ${getNextTime()}`);\n" + f"bot.chat('/difficulty {difficulty}');")
self.update_event(events)
return events
except Exception as e:
time.sleep(3) # wait for mineflayer to exit
# reset bot status here
events = self.reset(
events = self._reset(
options={
"mode": "hard",
"wait_ticks": 20,
@ -365,7 +365,7 @@ class MinecraftEnv(Environment, MinecraftExtEnv):
Exception: If there is an issue retrieving events.
"""
try:
events = self.step(
events = self._step(
code=self.code,
programs=self.programs,
)
@ -374,7 +374,7 @@ class MinecraftEnv(Environment, MinecraftExtEnv):
except Exception as e:
time.sleep(3) # wait for mineflayer to exit
# reset bot status here
events = self.reset(
events = self._reset(
options={
"mode": "hard",
"wait_ticks": 20,

View file

@ -5,20 +5,21 @@
import json
import time
from typing import Optional
from typing import Any, Optional
import requests
from pydantic import ConfigDict, Field, model_validator
from metagpt.environment.base_env import ExtEnv, mark_as_writeable
from metagpt.environment.minecraft_env.const import (
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.environment.minecraft.const import (
MC_CKPT_DIR,
MC_CORE_INVENTORY_ITEMS,
MC_CURRICULUM_OB,
MC_DEFAULT_WARMUP,
METAGPT_ROOT,
)
from metagpt.environment.minecraft_env.process_monitor import SubprocessMonitor
from metagpt.environment.minecraft.process_monitor import SubprocessMonitor
from metagpt.logs import logger
@ -38,6 +39,20 @@ class MinecraftExtEnv(ExtEnv):
server_paused: bool = Field(default=False)
warm_up: dict = Field(default=dict())
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
pass
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
pass
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
pass
@property
def server(self) -> str:
return f"{self.server_host}:{self.server_port}"
@ -48,7 +63,7 @@ class MinecraftExtEnv(ExtEnv):
self.mineflayer = SubprocessMonitor(
commands=[
"node",
METAGPT_ROOT.joinpath("metagpt", "environment", "minecraft_env", "mineflayer", "index.js"),
METAGPT_ROOT.joinpath("metagpt", "environment", "minecraft", "mineflayer", "index.js"),
str(self.server_port),
],
name="mineflayer",
@ -115,7 +130,7 @@ class MinecraftExtEnv(ExtEnv):
return res.json()
@mark_as_writeable
def reset(self, *, seed=None, options=None) -> dict:
def _reset(self, *, seed=None, options=None) -> dict:
if options is None:
options = {}
if options.get("inventory", {}) and options.get("mode", "hard") != "hard":
@ -145,7 +160,7 @@ class MinecraftExtEnv(ExtEnv):
return json.loads(returned_data)
@mark_as_writeable
def step(self, code: str, programs: str = "") -> dict:
def _step(self, code: str, programs: str = "") -> dict:
if not self.has_reset:
raise RuntimeError("Environment has not been reset yet")
self.check_process()

View file

@ -0,0 +1,105 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from typing import Any, Optional, Union
import numpy as np
import numpy.typing as npt
from gymnasium import spaces
from pydantic import ConfigDict, Field, field_validator
from metagpt.environment.base_env_space import (
BaseEnvAction,
BaseEnvActionType,
BaseEnvObsParams,
BaseEnvObsType,
)
class EnvActionType(BaseEnvActionType):
NONE = 0 # no action to run, just get observation
ADD_TILE_EVENT = 1 # Add an event triple to a tile
RM_TILE_EVENT = 2 # Remove an event triple from a tile
TURN_TILE_EVENT_IDLE = 3 # Turn an event triple from a tile into idle
RM_TITLE_SUB_EVENT = 4 # Remove an event triple that has the input subject from a tile
class EnvAction(BaseEnvAction):
"""env action type and its related params of action functions/apis"""
model_config = ConfigDict(arbitrary_types_allowed=True)
action_type: int = Field(default=EnvActionType.NONE, description="action type")
coord: npt.NDArray[np.int64] = Field(
default_factory=lambda: np.zeros(2, dtype=np.int64), description="tile coordinate"
)
subject: str = Field(default="", description="subject name of first element in event")
event: tuple[str, Optional[str], Optional[str], Optional[str]] = Field(
default=["", None, None, None], description="tile event"
)
@field_validator("coord", mode="before")
@classmethod
def check_coord(cls, coord) -> npt.NDArray[np.int64]:
if not isinstance(coord, np.ndarray):
return np.array(coord)
class EnvObsType(BaseEnvObsType):
"""get part observation with specific params"""
NONE = 0 # get whole observation from env
GET_TITLE = 1 # get the tile detail dictionary with given tile coord
TILE_PATH = 2 # get the tile address with given tile coord
TILE_NBR = 3 # get the neighbors of given tile coord and its vision radius
class EnvObsParams(BaseEnvObsParams):
"""observation params for different EnvObsType"""
model_config = ConfigDict(arbitrary_types_allowed=True)
obs_type: int = Field(default=EnvObsType.NONE, description="observation type")
coord: npt.NDArray[np.int64] = Field(
default_factory=lambda: np.zeros(2, dtype=np.int64), description="tile coordinate"
)
level: str = Field(default="", description="different level of title")
vision_radius: int = Field(default=0, description="the vision radius of current tile")
@field_validator("coord", mode="before")
@classmethod
def check_coord(cls, coord) -> npt.NDArray[np.int64]:
if not isinstance(coord, np.ndarray):
return np.array(coord)
EnvObsValType = Union[list[list[str]], dict[str, set[tuple[int, int]]], list[list[dict[str, Any]]]]
def get_observation_space() -> spaces.Dict:
# it's a
space = spaces.Dict(
{"collision_maze": spaces.Discrete(2), "tiles": spaces.Discrete(2), "address_tiles": spaces.Discrete(2)}
)
return space
def get_action_space(maze_shape: tuple[int, int]) -> spaces.Dict:
"""The fields defined by the space correspond to the input parameters of the action except `action_type`"""
space = spaces.Dict(
{
"action_type": spaces.Discrete(len(EnvActionType)),
"coord": spaces.Box(
np.array([0, 0], dtype=np.int64), np.array([maze_shape[0], maze_shape[1]], dtype=np.int64)
), # coord of the tile
"subject": spaces.Text(256), # the first element of an tile event
"event": spaces.Tuple(
(spaces.Text(256), spaces.Text(256), spaces.Text(256), spaces.Text(256))
), # event is a tuple of four str
}
)
return space

View file

@ -0,0 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : MG StanfordTown Env
from metagpt.environment.base_env import Environment
from metagpt.environment.stanford_town.stanford_town_ext_env import StanfordTownExtEnv
class StanfordTownEnv(StanfordTownExtEnv, Environment):
pass

View file

@ -5,11 +5,20 @@
import math
from pathlib import Path
from typing import Optional, Tuple
from typing import Any, Optional
from pydantic import ConfigDict, Field, model_validator
from metagpt.environment.base_env import ExtEnv, mark_as_readable, mark_as_writeable
from metagpt.environment.stanford_town.env_space import (
EnvAction,
EnvActionType,
EnvObsParams,
EnvObsType,
EnvObsValType,
get_action_space,
get_observation_space,
)
from metagpt.utils.common import read_csv_to_list, read_json_file
@ -197,15 +206,82 @@ class StanfordTownExtEnv(ExtEnv):
else:
address_tiles[add] = set([(j, i)])
values["address_tiles"] = address_tiles
values["action_space"] = get_action_space((maze_width, maze_height))
values["observation_space"] = get_observation_space()
return values
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, EnvObsValType], dict[str, Any]]:
"""reset env and get the init observation
Return results corresponding to `observation, info`
"""
super().reset(seed=seed, options=options)
obs = self._get_obs()
return obs, {}
def _get_obs(self) -> dict[str, EnvObsValType]:
"""Get observation"""
return {
"collision_maze": self.get_collision_maze(),
"tiles": self.tiles,
"address_tiles": self.get_address_tiles(),
}
def observe(self, obs_params: Optional[EnvObsParams] = None) -> Any:
"""Get partial or full observation from the env"""
obs_type = obs_params.obs_type if obs_params else EnvObsType.NONE
if obs_type == EnvObsType.NONE:
obs = self._get_obs()
elif obs_type == EnvObsType.GET_TITLE:
obs = self.access_tile(tile=obs_params.coord)
elif obs_type == EnvObsType.TILE_PATH:
obs = self.get_tile_path(tile=obs_params.coord, level=obs_params.level)
elif obs_type == EnvObsType.TILE_NBR:
obs = self.get_nearby_tiles(tile=obs_params.coord, vision_r=obs_params.vision_radius)
return obs
def step(self, action: EnvAction) -> tuple[dict[str, EnvObsValType], float, bool, bool, dict[str, Any]]:
"""Execute action and then return observation
Return results corresponding to `observation, reward, terminated, truncated, info`
"""
terminated = False
try:
self._execute_env_action(action)
except Exception:
terminated = True
obs = self._get_obs()
ret = (obs, 1.0, terminated, False, {})
return ret
def _execute_env_action(self, action: EnvAction):
action_type = action.action_type
if action_type == EnvActionType.NONE:
pass
elif action_type == EnvActionType.ADD_TILE_EVENT:
self.add_event_from_tile(curr_event=action.event, tile=action.coord)
elif action_type == EnvActionType.RM_TILE_EVENT:
self.remove_event_from_tile(curr_event=action.event, tile=action.coord)
elif action_type == EnvActionType.TURN_TILE_EVENT_IDLE:
self.turn_event_from_tile_idle(curr_event=action.event, tile=action.coord)
elif action_type == EnvActionType.RM_TITLE_SUB_EVENT:
self.remove_subject_events_from_tile(subject=action.subject, tile=action.coord)
def turn_coordinate_to_tile(self, px_coordinate: tuple[int, int]) -> tuple[int, int]:
"""
Turns a pixel coordinate to a tile coordinate.
"""
x = math.ceil(px_coordinate[0] / self.sq_tile_size)
y = math.ceil(px_coordinate[1] / self.sq_tile_size)
return (x, y)
return x, y
@mark_as_readable
def get_collision_maze(self) -> list:
@ -316,10 +392,6 @@ class StanfordTownExtEnv(ExtEnv):
nearby_tiles += [(i, j)]
return nearby_tiles
@mark_as_writeable
def add_tiles_event(self, pt_y: int, pt_x: int, event: Tuple[str, str, str, str]):
self.tiles[pt_y][pt_x]["events"].add(event)
@mark_as_writeable
def add_event_from_tile(self, curr_event: tuple[str], tile: tuple[int, int]) -> None:
"""

View file

@ -1,12 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : MG StanfordTown Env
from metagpt.environment.base_env import Environment
from metagpt.environment.stanford_town_env.stanford_town_ext_env import (
StanfordTownExtEnv,
)
class StanfordTownEnv(Environment, StanfordTownExtEnv):
pass

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -5,7 +5,7 @@
from pydantic import Field
from metagpt.environment.base_env import Environment
from metagpt.environment.werewolf_env.werewolf_ext_env import WerewolfExtEnv
from metagpt.environment.werewolf.werewolf_ext_env import WerewolfExtEnv
from metagpt.logs import logger
from metagpt.schema import Message

View file

@ -5,11 +5,12 @@
import random
from collections import Counter
from enum import Enum
from typing import Callable, Optional
from typing import Any, Callable, Optional
from pydantic import ConfigDict, Field
from metagpt.environment.base_env import ExtEnv, mark_as_readable, mark_as_writeable
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.logs import logger
@ -128,6 +129,20 @@ class WerewolfExtEnv(ExtEnv):
player_poisoned: Optional[str] = Field(default=None)
player_current_dead: list[str] = Field(default=[])
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
pass
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
pass
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
pass
@property
def living_players(self) -> list[str]:
player_names = []

3
metagpt/ext/__init__.py Normal file
View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,36 @@
## Stanford Town Game
### Pre-Description
In order to facilitate GA( [generative_agents](https://github.com/joonspk-research/generative_agents) )'s frontend docking data (to avoid changing its code), you can set the value `temp_storage_path` to `temp_storage` of `generative_agents` when start `run_st_game.py`. like
`python3 run_st_game.py --temp_storage_path path/to/ga/temp_storage xxx`
Or change the path under `const.py` like beflow
```
STORAGE_PATH = EXAMPLE_PATH.joinpath("storage")
TEMP_STORAGE_PATH = EXAMPLE_PATH.joinpath("temp_storage")
# updated
STORAGE_PATH = Path("{path/to/ga/storage}")
TEMP_STORAGE_PATH = Path("{path/to/ga/temp_storage}")
```
This can be used to achieve docking of simulation data without changing the GA code. Otherwise, the GA code must be modified to adapt to the MG output path.
If you don't want to start from 0, copy other simulation directories under `generative_agents/environment/frontend_server/storage/` to `examples/stanford_town/storage`, and select a directory named `fork_sim_code`.
### Backend service startup
The execution entry is `python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10`
or
`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10 --temp_storage_path path/to/ga/temp_storage`
`idea` is the user's voice to the first Agent, and it is disseminated through this voice to see whether the final multi-agents achieve the goal of hosting or participating in the event.
### Frontend service startup
Enter project folder `generative_agents`
Enter `environment/frontend_server` and use `python3 manage.py runserver` to start the front-end service.
Visit `http://localhost:8000/simulator_home` to enter the current simulation interface.
## Appreciation
The reproduction work has referred the `https://github.com/joonspk-research/generative_agents`, let's make a general statement here.

View file

@ -0,0 +1,35 @@
## Stanford Town Game
### 前置
为了方便GA [generative_agents](https://github.com/joonspk-research/generative_agents) )的前端对接数据(避免改动它那块的代码),可在启动`run_st_game.py`加上`temp_storage_path`指向`generative_agents`对应的`temp_storage`路径。比如
`python3 run_st_game.py --temp_storage_path path/to/ga/temp_storage xxx`
或将`const.py`下的
```
STORAGE_PATH = EXAMPLE_PATH.joinpath("storage")
TEMP_STORAGE_PATH = EXAMPLE_PATH.joinpath("temp_storage")
# 更新为
STORAGE_PATH = Path("{path/to/ga/storage}")
TEMP_STORAGE_PATH = Path("{path/to/ga/temp_storage}")
```
这样可用实现不改变GA代码情况下实现仿真数据的对接。不然得修改GA的代码来适配MG的输出路径。
如果你不想从0开始启动拷贝`generative_agents/environment/frontend_server/storage/`下的其他仿真目录到`examples/stanford_town/storage`,并选择一个目录名作为`fork_sim_code`
### 后端服务启动
执行入口为:`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10`
或者
`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10 --temp_storage_path path/to/ga/temp_storage`
`idea`为用户给第一个Agent的用户心声并通过这个心声进行传播看最后多智能体是否达到举办、参加活动的目标。
### 前端服务启动
进入`generative_agents`项目目录
进入`environment/frontend_server`,使用`python3 manage.py runserver`启动前端服务。
访问`http://localhost:8000/simulator_home` 进入当前的仿真界面。
## Appreciation
The reproduction work has referred the `https://github.com/joonspk-research/generative_agents`, let's make a general statement here.

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : stanford town implement

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : summarize relationship in a agent chat
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class AgentChatSumRel(STAction):
name: str = "AgentChatSumRel"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
resp = False
try:
_ = llm_resp.split('"')[0].strip()
resp = True
except Exception:
pass
return resp
def _func_cleanup(self, llm_resp: str, prompt: str) -> str:
return llm_resp.split('"')[0].strip()
def _func_fail_default_resp(self) -> str:
pass
async def run(self, init_role: "STRole", target_role: "STRole", statements: str) -> str:
def create_prompt_input(init_role: "STRole", target_role: "STRole", statements: str) -> str:
prompt_input = [statements, init_role.name, target_role.name]
return prompt_input
prompt_input = create_prompt_input(init_role, target_role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "summarize_chat_relationship_v2.txt")
example_output = "Jane Doe is working on a project"
special_instruction = "The output should be a string that responds to the question."
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,97 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : device to talk to another role, return yes or no
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class DecideToTalk(STAction):
name: str = "DecideToTalk"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
resp = False
try:
if llm_resp.split("Answer in yes or no:")[-1].strip().lower() in ["yes", "no"]:
resp = True
except ValueError:
pass
return resp
def _func_cleanup(self, llm_resp: str, prompt: str) -> str:
return llm_resp.split("Answer in yes or no:")[-1].strip().lower()
def _func_fail_default_resp(self) -> str:
return "yes"
async def run(self, init_role: "STRole", target_role: "STRole", retrieved: dict, *args, **kwargs) -> bool:
"""Run action"""
def create_prompt_input(init_role: "STRole", target_role: "STRole", retrieved: dict) -> str:
scratch = init_role.rc.scratch
target_scratch = target_role.rc.scratch
last_chat = init_role.rc.memory.get_last_chat(target_role.name)
last_chatted_time = ""
last_chat_about = ""
if last_chat:
last_chatted_time = last_chat.created.strftime("%B %d, %Y, %H:%M:%S")
last_chat_about = last_chat.description
context = ""
for c_node in retrieved["events"]:
curr_desc = c_node.description.split(" ")
curr_desc[2:3] = ["was"]
curr_desc = " ".join(curr_desc)
context += f"{curr_desc}. "
context += "\n"
for c_node in retrieved["thoughts"]:
context += f"{c_node.description}. "
curr_time = scratch.curr_time.strftime("%B %d, %Y, %H:%M:%S %p")
init_act_desc = scratch.act_description
if "(" in init_act_desc:
init_act_desc = init_act_desc.split("(")[-1][:-1]
if len(scratch.planned_path) == 0 and "waiting" not in init_act_desc:
init_p_desc = f"{init_role.name} is already {init_act_desc}"
elif "waiting" in init_act_desc:
init_p_desc = f"{init_role.name} is {init_act_desc}"
else:
init_p_desc = f"{init_role.name} is on the way to {init_act_desc}"
target_act_desc = scratch.act_description
if "(" in target_act_desc:
target_act_desc = target_act_desc.split("(")[-1][:-1]
if len(target_scratch.planned_path) == 0 and "waiting" not in init_act_desc:
target_p_desc = f"{target_role.name} is already {target_act_desc}"
elif "waiting" in init_act_desc:
target_p_desc = f"{init_role.name} is {init_act_desc}"
else:
target_p_desc = f"{target_role.name} is on the way to {target_act_desc}"
prompt_input = []
prompt_input += [context]
prompt_input += [curr_time]
prompt_input += [init_role.name]
prompt_input += [target_role.name]
prompt_input += [last_chatted_time]
prompt_input += [last_chat_about]
prompt_input += [init_p_desc]
prompt_input += [target_p_desc]
prompt_input += [init_role.name]
prompt_input += [target_role.name]
return prompt_input
prompt_input = create_prompt_input(init_role, target_role, retrieved)
prompt = self.generate_prompt_with_tmpl_filename(
prompt_input=prompt_input, tmpl_filename="decide_to_talk_v2.txt"
)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=20) # yes or no
result = True if output == "yes" else False
logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {result}")
return result

View file

@ -0,0 +1,20 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : dummy action to make every STRole can deal DummyMessage which is caused by DummyAction
from metagpt.actions import Action
from metagpt.schema import Message
class DummyAction(Action):
async def run(self, *args, **kwargs):
raise NotImplementedError
class DummyMessage(Message):
"""
dummy message to pass to role and make them to have a execution every round
"""
content: str = "dummy"
cause_by: str = "DummyAction"

View file

@ -0,0 +1,401 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : gen_action_details
import random
from metagpt.environment.stanford_town.env_space import EnvObsParams, EnvObsType
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class GenActionSector(STAction):
name: str = "GenActionSector"
def _func_cleanup(self, llm_resp: str, prompt: str):
cleaned_response = llm_resp.split("}")[0]
return cleaned_response
def _func_validate(self, llm_resp: str, prompt: str):
if len(llm_resp.strip()) < 1:
return False
if "}" not in llm_resp:
return False
if "," in llm_resp:
return False
return True
def _func_fail_default_resp(self):
fs = "kitchen"
return fs
async def run(self, role: "STRole", access_tile: dict[str, str], act_desp: str):
def create_prompt_input(role, access_tile: dict[str, str], act_desp):
act_world = f"{access_tile['world']}"
prompt_input = []
prompt_input += [role.scratch.get_str_name()]
prompt_input += [role.scratch.living_area.split(":")[1]]
x = f"{act_world}:{role.scratch.living_area.split(':')[1]}"
prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)]
prompt_input += [role.scratch.get_str_name()]
prompt_input += [f"{access_tile['sector']}"]
x = f"{act_world}:{access_tile['sector']}"
prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)]
if role.scratch.get_str_daily_plan_req() != "":
prompt_input += [f"\n{role.scratch.get_str_daily_plan_req()}"]
else:
prompt_input += [""]
# MAR 11 TEMP
prompt_input = []
act_world = access_tile["world"]
accessible_sector_str = role.s_mem.get_str_accessible_sectors(act_world)
curr = accessible_sector_str.split(", ")
fin_accessible_sectors = []
for i in curr:
if "'s house" in i:
if role.scratch.last_name in i:
fin_accessible_sectors += [i]
else:
fin_accessible_sectors += [i]
accessible_sector_str = ", ".join(fin_accessible_sectors)
# END MAR 11 TEMP
prompt_input += [accessible_sector_str]
act_desp_1 = act_desp
act_desp_2 = act_desp
if "(" in act_desp:
act_desp_1 = act_desp.split("(")[0].strip()
act_desp_2 = act_desp.split("(")[-1][:-1]
prompt_input += [role.scratch.get_str_name()]
prompt_input += [act_desp_1]
prompt_input += [act_desp_2]
prompt_input += [role.scratch.get_str_name()]
return prompt_input
prompt_template = "action_location_sector_v1.txt"
prompt_input = create_prompt_input(role, access_tile, act_desp)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=15)
y = f"{access_tile['world']}"
x = [i.strip() for i in role.s_mem.get_str_accessible_sectors(y).split(",")]
if output not in x:
# output = random.choice(x)
output = role.scratch.living_area.split(":")[1]
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenActionArena(STAction):
name: str = "GenActionArena"
def _func_cleanup(self, llm_resp: str, prompt: str):
cleaned_response = llm_resp.split("}")[0]
return cleaned_response
def _func_validate(self, llm_resp: str, prompt: str):
if len(llm_resp.strip()) < 1:
return False
if "}" not in llm_resp:
return False
if "," in llm_resp:
return False
return True
def _func_fail_default_resp(self):
fs = "kitchen"
return fs
async def run(self, role: "STRole", act_desp: str, act_world: str, act_sector: str):
def create_prompt_input(role, act_desp, act_world, act_sector):
prompt_input = []
prompt_input += [role.scratch.get_str_name()]
x = f"{act_world}:{act_sector}"
prompt_input += [act_sector]
# MAR 11 TEMP
accessible_arena_str = role.s_mem.get_str_accessible_sector_arenas(x)
curr = accessible_arena_str.split(", ")
fin_accessible_arenas = []
for i in curr:
if "'s room" in i:
if role.scratch.last_name in i:
fin_accessible_arenas += [i]
else:
fin_accessible_arenas += [i]
accessible_arena_str = ", ".join(fin_accessible_arenas)
# END MAR 11 TEMP
prompt_input += [accessible_arena_str]
act_desp_1 = act_desp
act_desp_2 = act_desp
if "(" in act_desp:
act_desp_1 = act_desp.split("(")[0].strip()
act_desp_2 = act_desp.split("(")[-1][:-1]
prompt_input += [role.scratch.get_str_name()]
prompt_input += [act_desp_1]
prompt_input += [act_desp_2]
prompt_input += [role.scratch.get_str_name()]
prompt_input += [act_sector]
prompt_input += [accessible_arena_str]
return prompt_input
prompt_template = "action_location_object_vMar11.txt"
prompt_input = create_prompt_input(role, act_desp, act_world, act_sector)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=15)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenActionObject(STAction):
name: str = "GenActionObject"
def _func_validate(self, llm_resp: str, prompt: str):
if len(llm_resp.strip()) < 1:
return False
return True
def _func_cleanup(self, llm_resp: str, prompt: str):
cleaned_response = llm_resp.strip()
return cleaned_response
def _func_fail_default_resp(self):
fs = "bed"
return fs
async def run(self, role: "STRole", act_desp: str, temp_address: str):
def create_prompt_input(role, act_desp, temp_address):
prompt_input = []
if "(" in act_desp:
act_desp = act_desp.split("(")[-1][:-1]
prompt_input += [act_desp]
prompt_input += [role.s_mem.get_str_accessible_arena_game_objects(temp_address)]
return prompt_input
prompt_template = "action_object_v2.txt"
prompt_input = create_prompt_input(role, act_desp, temp_address)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=15)
x = [i.strip() for i in role.s_mem.get_str_accessible_arena_game_objects(temp_address).split(",")]
if output not in x:
output = random.choice(x)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenPronunciatio(STAction):
name: str = "GenPronunciatio"
def _func_cleanup(self, llm_resp: str, prompt: str):
cr = llm_resp.strip()
if len(cr) > 3:
cr = cr[:3]
return cr
def _func_validate(self, llm_resp: str, prompt: str):
try:
self._func_cleanup(llm_resp, prompt="")
if len(llm_resp) == 0:
return False
except Exception:
return False
return True
def _func_fail_default_resp(self):
fs = "😋"
return fs
async def run(self, role: "STRole", act_desp: str):
def create_prompt_input(act_desp):
if "(" in act_desp:
act_desp = act_desp.split("(")[-1].split(")")[0]
prompt_input = [act_desp]
return prompt_input
prompt_template = "generate_pronunciatio_v1.txt"
prompt_input = create_prompt_input(act_desp)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
example_output = "🛁🧖‍♀️"
special_instruction = "The value for the output must ONLY contain the emojis."
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenEventTriple(STAction):
name: str = "GenEventTriple"
def _func_cleanup(self, llm_resp: str, prompt: str):
cr = llm_resp.strip()
cr = [i.strip() for i in cr.split(")")[0].split(",")]
return cr
def _func_validate(self, llm_resp: str, prompt: str):
try:
llm_resp = self._func_cleanup(llm_resp, prompt="")
if len(llm_resp) != 2:
return False
except Exception:
return False
return True
def _func_fail_default_resp(self, role):
fs = (role.name, "is", "idle")
return fs
async def run(self, role: "STRole", act_desp: str):
def create_prompt_input(role, act_desp):
if "(" in act_desp:
act_desp = act_desp.split("(")[-1].split(")")[0]
prompt_input = [role.name, act_desp, role.name]
return prompt_input
prompt_template = "generate_event_triple_v1.txt"
prompt_input = create_prompt_input(role, act_desp)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp(role)
output = await self._run_gpt35_max_tokens(prompt, max_tokens=30)
output = (role.name, output[0], output[1])
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenActObjDescription(STAction):
name: str = "GenActObjDescription"
def _func_cleanup(self, llm_resp: str, prompt: str):
cr = llm_resp.strip()
if cr[-1] == ".":
cr = cr[:-1]
return cr
def _func_validate(self, llm_resp: str, prompt: str):
try:
llm_resp = self._func_cleanup(llm_resp, prompt="")
except Exception:
return False
return True
def _func_fail_default_resp(self, act_game_object):
fs = f"{act_game_object} is idle"
return fs
async def run(self, role: "STRole", act_game_object: str, act_desp: str):
def create_prompt_input(act_game_object, act_desp, role):
prompt_input = [act_game_object, role.name, act_desp, act_game_object, act_game_object]
return prompt_input
prompt_template = "generate_obj_event_v1.txt"
prompt_input = create_prompt_input(act_game_object, act_desp, role)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
example_output = "being fixed"
special_instruction = "The output should ONLY contain the phrase that should go in <fill in>."
self.fail_default_resp = self._func_fail_default_resp(act_game_object)
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenObjEventTriple(STAction):
name: str = "GenObjEventTriple"
def _func_cleanup(self, llm_resp: str, prompt: str):
cr = llm_resp.strip()
cr = [i.strip() for i in cr.split(")")[0].split(",")]
return cr
def _func_validate(self, llm_resp: str, prompt: str):
try:
llm_resp = self._func_cleanup(llm_resp, prompt="")
if len(llm_resp) != 2:
return False
except Exception:
return False
return True
def _func_fail_default_resp(self, act_game_object: str):
fs = (act_game_object, "is", "idle")
return fs
async def run(self, role: "STRole", act_game_object, act_obj_desp):
def create_prompt_input(act_game_object, act_obj_desp):
prompt_input = [act_game_object, act_obj_desp, act_game_object]
return prompt_input
prompt_template = "generate_event_triple_v1.txt"
prompt_input = create_prompt_input(act_game_object, act_obj_desp)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp(act_game_object)
output = await self._run_gpt35_max_tokens(prompt, max_tokens=30)
output = (act_game_object, output[0], output[1])
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
class GenActionDetails(STAction):
name: str = "GenActionDetails"
def _func_cleanup(self, llm_resp: str, prompt: str) -> list:
pass
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
# TODO -- this sometimes generates error
try:
self._func_cleanup(llm_resp)
except Exception:
return False
return True
def _func_fail_default_resp(self):
fs = {}
return fs
async def run(self, role: "STRole", act_desp: str, act_dura):
access_tile = role.rc.env.observe(
obs_params=EnvObsParams(obs_type=EnvObsType.GET_TITLE, coord=role.scratch.curr_tile)
)
act_world = access_tile["world"]
act_sector = await GenActionSector().run(role, access_tile, act_desp)
act_arena = await GenActionArena().run(role, act_desp, act_world, act_sector)
act_address = f"{act_world}:{act_sector}:{act_arena}"
if not role.s_mem.get_str_accessible_arena_game_objects(act_address):
act_game_object = "<random>"
else:
act_game_object = await GenActionObject().run(role, act_desp, act_address)
new_address = f"{act_world}:{act_sector}:{act_arena}:{act_game_object}"
act_pron = await GenPronunciatio().run(role, act_desp)
act_event = await GenEventTriple().run(role, act_desp)
# Persona's actions also influence the object states. We set those up here.
act_obj_desp = await GenActObjDescription().run(role, act_game_object, act_desp)
act_obj_pron = await GenPronunciatio().run(role, act_obj_desp)
act_obj_event = await GenObjEventTriple().run(role, act_game_object, act_obj_desp)
result_dict = {
"action_address": new_address,
"action_duration": int(act_dura),
"action_description": act_desp,
"action_pronunciatio": act_pron,
"action_event": act_event,
"chatting_with": None,
"chat": None,
"chatting_with_buffer": None,
"chatting_end_time": None,
"act_obj_description": act_obj_desp,
"act_obj_pronunciatio": act_obj_pron,
"act_obj_event": act_obj_event,
}
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {result_dict}")
return result_dict

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : gen_daily_schedule
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class GenDailySchedule(STAction):
name: str = "GenDailySchedule"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt="")
except Exception:
return False
return True
def _func_cleanup(self, llm_resp: str, prompt: str) -> list:
cr = []
_cr = llm_resp.split(")")
for i in _cr:
if i[-1].isdigit():
i = i[:-1].strip()
if i[-1] == "." or i[-1] == ",":
cr += [i[:-1].strip()]
return cr
def _func_fail_default_resp(self) -> int:
fs = [
"wake up and complete the morning routine at 6:00 am",
"eat breakfast at 7:00 am",
"read a book from 8:00 am to 12:00 pm",
"have lunch at 12:00 pm",
"take a nap from 1:00 pm to 4:00 pm",
"relax and watch TV from 7:00 pm to 8:00 pm",
"go to bed at 11:00 pm",
]
return fs
async def run(self, role: "STRole", wake_up_hour: str):
def create_prompt_input(role, wake_up_hour):
prompt_input = []
prompt_input += [role.scratch.get_str_iss()]
prompt_input += [role.scratch.get_str_lifestyle()]
prompt_input += [role.scratch.get_str_curr_date_str()]
prompt_input += [role.scratch.get_str_firstname()]
prompt_input += [f"{str(wake_up_hour)}:00 am"]
return prompt_input
wake_up_hour = int(wake_up_hour)
prompt_template = "daily_planning_v6.txt"
prompt_input = create_prompt_input(role, wake_up_hour)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=500)
output = [f"wake up and complete the morning routine at {wake_up_hour}:00 am"] + output
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,181 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : gen_hourly_schedule
import random
import string
from metagpt.logs import logger
from .st_action import STAction
def get_random_alphanumeric(i=6, j=6):
"""
Returns a random alpha numeric strength that has the length of somewhere
between i and j.
INPUT:
i: min_range for the length
j: max_range for the length
OUTPUT:
an alpha numeric str with the length of somewhere between i and j.
"""
k = random.randint(i, j)
x = "".join(random.choices(string.ascii_letters + string.digits, k=k))
return x
class GenHourlySchedule(STAction):
name: str = "GenHourlySchedule"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt="")
except Exception:
return False
return True
def _func_cleanup(self, llm_resp: str, prompt: str) -> list:
cr = llm_resp.strip()
if cr[-1] == ".":
cr = cr[:-1]
# to only use the first line of output
cr = cr.split("\n")[0]
return cr
def _func_fail_default_resp(self) -> int:
fs = "asleep"
return fs
async def _generate_schedule_for_given_hour(
self, role: "STRole", curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2=None
):
def create_prompt_input(persona, curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2=None):
schedule_format = ""
for i in hour_str:
schedule_format += f"[{persona.scratch.get_str_curr_date_str()} -- {i}]"
schedule_format += " Activity: [Fill in]\n"
schedule_format = schedule_format[:-1]
intermission_str = "Here the originally intended hourly breakdown of"
intermission_str += f" {persona.scratch.get_str_firstname()}'s schedule today: "
for count, i in enumerate(persona.scratch.daily_req):
intermission_str += f"{str(count + 1)}) {i}, "
intermission_str = intermission_str[:-2]
prior_schedule = ""
if p_f_ds_hourly_org:
prior_schedule = "\n"
for count, i in enumerate(p_f_ds_hourly_org):
prior_schedule += f"[(ID:{get_random_alphanumeric()})"
prior_schedule += f" {persona.scratch.get_str_curr_date_str()} --"
prior_schedule += f" {hour_str[count]}] Activity:"
prior_schedule += f" {persona.scratch.get_str_firstname()}"
prior_schedule += f" is {i}\n"
prompt_ending = f"[(ID:{get_random_alphanumeric()})"
prompt_ending += f" {persona.scratch.get_str_curr_date_str()}"
prompt_ending += f" -- {curr_hour_str}] Activity:"
prompt_ending += f" {persona.scratch.get_str_firstname()} is"
if intermission2:
intermission2 = f"\n{intermission2}"
prompt_input = []
prompt_input += [schedule_format]
prompt_input += [persona.scratch.get_str_iss()]
prompt_input += [prior_schedule + "\n"]
prompt_input += [intermission_str]
if intermission2:
prompt_input += [intermission2]
else:
prompt_input += [""]
prompt_input += [prompt_ending]
return prompt_input
prompt_template = "generate_hourly_schedule_v2.txt"
prompt_input = create_prompt_input(role, curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2)
prompt_input_str = "\n".join(prompt_input)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=50)
logger.info(
f"Role: {role.name} _generate_schedule_for_given_hour prompt_input: {prompt_input_str}, "
f"output: {output}"
)
return output
async def run(self, role: "STRole", wake_up_hour: int):
hour_str = [
"00:00 AM",
"01:00 AM",
"02:00 AM",
"03:00 AM",
"04:00 AM",
"05:00 AM",
"06:00 AM",
"07:00 AM",
"08:00 AM",
"09:00 AM",
"10:00 AM",
"11:00 AM",
"12:00 PM",
"01:00 PM",
"02:00 PM",
"03:00 PM",
"04:00 PM",
"05:00 PM",
"06:00 PM",
"07:00 PM",
"08:00 PM",
"09:00 PM",
"10:00 PM",
"11:00 PM",
]
n_m1_activity = []
diversity_repeat_count = 1 # TODO mg 1->3
for i in range(diversity_repeat_count):
logger.info(f"diversity_repeat_count idx: {i}")
n_m1_activity_set = set(n_m1_activity)
if len(n_m1_activity_set) < 5:
n_m1_activity = []
for count, curr_hour_str in enumerate(hour_str):
if wake_up_hour > 0:
n_m1_activity += ["sleeping"]
wake_up_hour -= 1
else:
logger.info(f"_generate_schedule_for_given_hour idx: {count}, n_m1_activity: {n_m1_activity}")
n_m1_activity += [
await self._generate_schedule_for_given_hour(role, curr_hour_str, n_m1_activity, hour_str)
]
# Step 1. Compressing the hourly schedule to the following format:
# The integer indicates the number of hours. They should add up to 24.
# [['sleeping', 6], ['waking up and starting her morning routine', 1],
# ['eating breakfast', 1], ['getting ready for the day', 1],
# ['working on her painting', 2], ['taking a break', 1],
# ['having lunch', 1], ['working on her painting', 3],
# ['taking a break', 2], ['working on her painting', 2],
# ['relaxing and watching TV', 1], ['going to bed', 1], ['sleeping', 2]]
_n_m1_hourly_compressed = []
prev = None
prev_count = 0
for i in n_m1_activity:
if i != prev:
prev_count = 1
_n_m1_hourly_compressed += [[i, prev_count]]
prev = i
elif _n_m1_hourly_compressed:
_n_m1_hourly_compressed[-1][1] += 1
# Step 2. Expand to min scale (from hour scale)
# [['sleeping', 360], ['waking up and starting her morning routine', 60],
# ['eating breakfast', 60],..
n_m1_hourly_compressed = []
for task, duration in _n_m1_hourly_compressed:
n_m1_hourly_compressed += [[task, duration * 60]]
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {n_m1_hourly_compressed}")
return n_m1_hourly_compressed

View file

@ -0,0 +1,125 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : generate_iterative_chat_utt
from metagpt.environment.stanford_town.env_space import EnvObsParams, EnvObsType
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.ext.stanford_town.utils.utils import extract_first_json_dict
from metagpt.logs import logger
class GenIterChatUTT(STAction):
name: str = "GenIterChatUTT"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
resp = False
try:
_ = extract_first_json_dict(llm_resp)
resp = True
except Exception:
pass
return resp
def _func_cleanup(self, llm_resp: str, prompt: str) -> dict:
gpt_response = extract_first_json_dict(llm_resp)
cleaned_dict = dict()
cleaned = []
for key, val in gpt_response.items():
cleaned += [val]
cleaned_dict["utterance"] = cleaned[0]
cleaned_dict["end"] = True
if "f" in str(cleaned[1]) or "F" in str(cleaned[1]):
cleaned_dict["end"] = False
return cleaned_dict
def _func_fail_default_resp(self) -> dict:
cleaned_dict = dict()
cleaned_dict["utterance"] = "..."
cleaned_dict["end"] = False
return cleaned_dict
async def run(
self,
init_role: "STRole",
target_role: "STRole",
retrieved: dict,
curr_context: str,
curr_chat: list[str],
*args,
**kwargs,
) -> dict:
def create_prompt_input(
access_tile: dict[str, str],
init_role: "STRole",
target_role: "STRole",
retrieved: dict,
curr_context: str,
curr_chat: list[str],
):
role = init_role
scratch = role.rc.scratch
target_scratch = target_role.rc.scratch
prev_convo_insert = "\n"
if role.rc.memory.chat_list:
for i in role.rc.memory.chat_list:
if i.object == target_role.name:
v1 = int((scratch.curr_time - i.created).total_seconds() / 60)
prev_convo_insert += (
f"{str(v1)} minutes ago, {scratch.name} and "
f"{target_scratch.name} were already {i.description} "
f"This context takes place after that conversation."
)
break
if prev_convo_insert == "\n":
prev_convo_insert = ""
if role.rc.memory.chat_list:
if int((scratch.curr_time - role.rc.memory.chat_list[-1].created).total_seconds() / 60) > 480:
prev_convo_insert = ""
logger.info(f"prev_convo_insert: {prev_convo_insert}")
curr_sector = f"{access_tile['sector']}"
curr_arena = f"{access_tile['arena']}"
curr_location = f"{curr_arena} in {curr_sector}"
retrieved_str = ""
for key, vals in retrieved.items():
for v in vals:
retrieved_str += f"- {v.description}\n"
convo_str = ""
for i in curr_chat:
convo_str += ": ".join(i) + "\n"
if convo_str == "":
convo_str = "[The conversation has not started yet -- start it!]"
init_iss = f"Here is Here is a brief description of {scratch.name}.\n{scratch.get_str_iss()}"
prompt_input = [
init_iss,
scratch.name,
retrieved_str,
prev_convo_insert,
curr_location,
curr_context,
scratch.name,
target_scratch.name,
convo_str,
scratch.name,
target_scratch.name,
scratch.name,
scratch.name,
scratch.name,
]
return prompt_input
access_tile = init_role.rc.env.observe(
obs_params=EnvObsParams(obs_type=EnvObsType.GET_TITLE, coord=init_role.scratch.curr_tile)
)
prompt_input = create_prompt_input(access_tile, init_role, target_role, retrieved, curr_context, curr_chat)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "iterative_convo_v1.txt")
# original using `ChatGPT_safe_generate_response_OLD`
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_wo_extra_prompt(prompt)
logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class AgentWhisperThoughtAction(STAction):
name: str = "AgentWhisperThoughtAction"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> list:
return llm_resp.split('"')[0].strip()
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str:
def create_prompt_input(role: "STRole", statements, test_input=None):
prompt_input = [role.scratch.name, statements]
return prompt_input
prompt_input = create_prompt_input(role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "whisper_inner_thought_v1.txt")
output = await self._run_gpt35_max_tokens(prompt, max_tokens=50)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,154 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : new_decomp_schedule
import datetime
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class NewDecompSchedule(STAction):
name: str = "NewDecompSchedule"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
resp = False
try:
llm_resp = self._func_cleanup(llm_resp, prompt)
dur_sum = 0
for act, dur in llm_resp:
dur_sum += dur
if isinstance(act, str):
return False
if isinstance(dur, int):
return False
x = prompt.split("\n")[0].split("originally planned schedule from")[-1].strip()[:-1]
x = [datetime.datetime.strptime(i.strip(), "%H:%M %p") for i in x.split(" to ")]
delta_min = int((x[1] - x[0]).total_seconds() / 60)
if int(dur_sum) != int(delta_min):
return False
except Exception:
pass
return resp
def _func_cleanup(self, llm_resp: str, prompt: str) -> list:
new_schedule = prompt + " " + llm_resp.strip()
new_schedule = new_schedule.split("The revised schedule:")[-1].strip()
new_schedule = new_schedule.split("\n")
ret_temp = []
for i in new_schedule:
ret_temp += [i.split(" -- ")]
ret = []
for time_str, action in ret_temp:
start_time = time_str.split(" ~ ")[0].strip()
end_time = time_str.split(" ~ ")[1].strip()
delta = datetime.datetime.strptime(end_time, "%H:%M") - datetime.datetime.strptime(start_time, "%H:%M")
delta_min = int(delta.total_seconds() / 60)
if delta_min < 0:
delta_min = 0
ret += [[action, delta_min]]
return ret
def _func_fail_default_resp(self, main_act_dur: int, truncated_act_dur: int) -> int:
dur_sum = 0
for act, dur in main_act_dur:
dur_sum += dur
ret = truncated_act_dur[:]
ret += main_act_dur[len(ret) - 1 :]
# If there are access, we need to trim...
ret_dur_sum = 0
count = 0
over = None
for act, dur in ret:
ret_dur_sum += dur
if ret_dur_sum == dur_sum:
break
if ret_dur_sum > dur_sum:
over = ret_dur_sum - dur_sum
break
count += 1
if over:
ret = ret[: count + 1]
ret[-1][1] -= over
return ret
async def run(
self,
role: "STRole",
main_act_dur: int,
truncated_act_dur: int,
start_time_hour: datetime,
end_time_hour: datetime,
inserted_act: str,
inserted_act_dur: int,
*args,
**kwargs,
):
def create_prompt_input(
role: "STRole",
main_act_dur: int,
truncated_act_dur: int,
start_time_hour: datetime,
end_time_hour: datetime,
inserted_act: str,
inserted_act_dur: int,
):
persona_name = role.name
start_hour_str = start_time_hour.strftime("%H:%M %p")
end_hour_str = end_time_hour.strftime("%H:%M %p")
original_plan = ""
for_time = start_time_hour
for i in main_act_dur:
original_plan += (
f'{for_time.strftime("%H:%M")} ~ '
f'{(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + i[0]
)
original_plan += "\n"
for_time += datetime.timedelta(minutes=int(i[1]))
new_plan_init = ""
for_time = start_time_hour
for count, i in enumerate(truncated_act_dur):
new_plan_init += (
f'{for_time.strftime("%H:%M")} ~ '
f'{(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + i[0]
)
new_plan_init += "\n"
if count < len(truncated_act_dur) - 1:
for_time += datetime.timedelta(minutes=int(i[1]))
new_plan_init += (for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M") + " ~"
prompt_input = [
persona_name,
start_hour_str,
end_hour_str,
original_plan,
persona_name,
inserted_act,
inserted_act_dur,
persona_name,
start_hour_str,
end_hour_str,
end_hour_str,
new_plan_init,
]
return prompt_input
prompt_input = create_prompt_input(
role, main_act_dur, truncated_act_dur, start_time_hour, end_time_hour, inserted_act, inserted_act_dur
)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "new_decomp_schedule_v1.txt")
self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur)
output = await self._run_gpt35_max_tokens(prompt, max_tokens=1000)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,277 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : Integration Reflect Action
import re
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
# Run GPT Prompt Focal Point method
class AgentFocusPt(STAction):
name: str = "AgentFocusPt"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
try:
"""
Cleanup handling has been completed for run_v2
"""
return llm_resp
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, n: int, test_input=None) -> str:
def create_prompt_input(role: "STRole", statements, n, test_input=None):
prompt_input = [statements, str(n)]
return prompt_input
prompt_input = create_prompt_input(role, statements, n)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "generate_focal_pt_v1.txt")
example_output = '["What should Jane do for lunch", "Does Jane like strawberry", "Who is Jane"]'
special_instruction = "Output must be a list of str."
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Insight and Guidance
class AgentInsightAndGuidance(STAction):
name: str = "AgentInsightAndGuidance"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> dict:
try:
llm_resp = "1. " + llm_resp.strip()
ret = dict()
for i in llm_resp.split("\n"):
row = " ".join(i.split(". ")[1:])
if "(because of " not in row:
continue
thought = row.split("(because of ")[0].strip()
if ")" not in row.split("(because of ")[1]:
continue
evi_raw = row.split("(because of ")[1].split(")")[0].strip()
evi_raw = re.findall(r"\d+", evi_raw)
evi_raw = [int(i.strip()) for i in evi_raw]
ret[thought] = evi_raw
return ret
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self, n: int) -> str:
return ["I am hungry"] * n
async def run(self, role: "STRole", statements: str, n: int, test_input=None) -> dict:
def create_prompt_input(role, statements, n, test_input=None):
prompt_input = [statements, str(n)]
return prompt_input
prompt_input = create_prompt_input(role, statements, n)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "insight_and_evidence_v1.txt")
self.fail_default_resp = self._func_fail_default_resp(n)
output = await self._run_gpt35_max_tokens(prompt, max_tokens=150)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Event Triple
class AgentEventTriple(STAction):
name: str = "AgentEventTriple"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
llm_resp = self._func_cleanup(llm_resp, prompt="")
if len(llm_resp) != 2:
return False
except Exception:
return False
return True
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> list:
try:
cr = llm_resp.strip()
cr = [i.strip() for i in cr.split(")")[0].split(",")]
if len(cr) != 2:
return cr[-2:]
return cr
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, statements: str, role: "STRole", verbose=False) -> tuple:
def create_prompt_input(statements, role):
if "(" in statements:
statements = statements.split("(")[-1].split(")")[0]
prompt_input = [role.scratch.name, statements, role.scratch.name]
return prompt_input
prompt_input = create_prompt_input(statements, role)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "generate_event_triple_v1.txt")
output = await self._run_gpt35_max_tokens(prompt, max_tokens=30)
output = (role.scratch.name, output[0], output[1])
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Event Poignancy
class AgentEventPoignancy(STAction):
name: str = "AgentEventPoignancy"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> int:
try:
llm_resp = int(llm_resp.strip())
return llm_resp
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str:
def create_prompt_input(role: "STRole", statements: str, test_input=None):
prompt_input = [role.scratch.name, role.scratch.get_str_iss(), role.scratch.name, statements]
return prompt_input
prompt_input = create_prompt_input(role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "poignancy_event_v1.txt")
example_output = "5" # ########
special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10."
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Chat Poignancy
class AgentChatPoignancy(STAction):
name: str = "AgentChatPoignancy"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> int:
try:
llm_resp = int(llm_resp.strip())
return llm_resp
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str:
def create_prompt_input(role: "STRole", statements, test_input=None):
prompt_input = [role.scratch.name, role.scratch.get_str_iss(), role.scratch.name, statements]
return prompt_input
prompt_input = create_prompt_input(role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "poignancy_chat_v1.txt")
example_output = "5" # ########
special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10."
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Planning Thought on Convo
class AgentPlanThoughtOnConvo(STAction):
name: str = "AgentPlanThoughtOnConvo"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
try:
return llm_resp.split('"')[0].strip()
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str:
def create_prompt_input(role, statements, test_input=None):
prompt_input = [statements, role.scratch.name, role.scratch.name, role.scratch.name]
return prompt_input
prompt_input = create_prompt_input(role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "planning_thought_on_convo_v1.txt")
output = await self._run_gpt35_max_tokens(prompt, max_tokens=50)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output
# Run GPT Prompt Memory on Convo
class AgentMemoryOnConvo(STAction):
name: str = "AgentMemoryOnConvo"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
try:
self._func_cleanup(llm_resp, prompt)
return True
except Exception:
return False
def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str:
try:
return llm_resp.split('"')[0].strip()
except Exception as exp:
logger.error(f"{self.cls_name} with error {exp}")
def _func_fail_default_resp(self) -> str:
pass
async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str:
def create_prompt_input(role, statements, test_input=None):
prompt_input = [statements, role.scratch.name, role.scratch.name, role.scratch.name]
return prompt_input
prompt_input = create_prompt_input(role, statements)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "memo_on_convo_v1.txt")
example_output = "Jane Doe was interesting to talk to."
special_instruction = (
"The output should ONLY contain a string that summarizes anything interesting "
"that the agent may have noticed"
)
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,119 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : StanfordTown Action
import json
import time
from abc import abstractmethod
from pathlib import Path
from typing import Any, Optional, Union
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.ext.stanford_town.utils.const import PROMPTS_DIR
from metagpt.logs import logger
class STAction(Action):
name: str = "STAction"
prompt_dir: Path = PROMPTS_DIR
fail_default_resp: Optional[str] = None
@property
def cls_name(self):
return self.__class__.__name__
@abstractmethod
def _func_validate(self, llm_resp: str, prompt: str):
raise NotImplementedError
@abstractmethod
def _func_cleanup(self, llm_resp: str, prompt: str):
raise NotImplementedError
@abstractmethod
def _func_fail_default_resp(self):
raise NotImplementedError
def generate_prompt_with_tmpl_filename(self, prompt_input: Union[str, list], tmpl_filename) -> str:
"""
same with `generate_prompt`
Args:
prompt_input: the input we want to feed in (IF THERE ARE MORE THAN ONE INPUT, THIS CAN BE A LIST.)
tmpl_filename: prompt template filename
Returns:
a str prompt that will be sent to LLM server.
"""
if isinstance(prompt_input, str):
prompt_input = [prompt_input]
prompt_input = [str(i) for i in prompt_input]
f = open(str(self.prompt_dir.joinpath(tmpl_filename)), "r")
prompt = f.read()
f.close()
for count, i in enumerate(prompt_input):
prompt = prompt.replace(f"!<INPUT {count}>!", i)
if "<commentblockmarker>###</commentblockmarker>" in prompt:
prompt = prompt.split("<commentblockmarker>###</commentblockmarker>")[1]
return prompt.strip()
async def _aask(self, prompt: str) -> str:
return await self.llm.aask(prompt)
async def _run_gpt35_max_tokens(self, prompt: str, max_tokens: int = 50, retry: int = 3):
for idx in range(retry):
try:
tmp_max_tokens_rsp = getattr(config.llm, "max_token", 1500)
setattr(config.llm, "max_token", max_tokens)
self.llm.use_system_prompt = False # to make it behave like a non-chat completions
llm_resp = await self._aask(prompt)
setattr(config.llm, "max_token", tmp_max_tokens_rsp)
logger.info(f"Action: {self.cls_name} llm _run_gpt35_max_tokens raw resp: {llm_resp}")
if self._func_validate(llm_resp, prompt):
return self._func_cleanup(llm_resp, prompt)
except Exception as exp:
logger.warning(f"Action: {self.cls_name} _run_gpt35_max_tokens exp: {exp}")
time.sleep(5)
return self.fail_default_resp
async def _run_gpt35(
self, prompt: str, example_output: str, special_instruction: str, retry: int = 3
) -> Union[bool, Any]:
"""same with `gpt_structure.ChatGPT_safe_generate_response`"""
prompt = '"""\n' + prompt + '\n"""\n'
prompt += f"Output the response to the prompt above in json. {special_instruction}\n"
prompt += "Example output json:\n"
prompt += '{"output": "' + str(example_output) + '"}'
for idx in range(retry):
try:
llm_resp = await self._aask(prompt)
logger.info(f"Action: {self.cls_name} llm _run_gpt35 raw resp: {llm_resp}")
end_idx = llm_resp.strip().rfind("}") + 1
llm_resp = llm_resp[:end_idx]
llm_resp = json.loads(llm_resp)["output"]
if self._func_validate(llm_resp, prompt):
return self._func_cleanup(llm_resp, prompt)
except Exception as exp:
logger.warning(f"Action: {self.cls_name} _run_gpt35 exp: {exp}")
time.sleep(5) # usually avoid `Rate limit`
return False
async def _run_gpt35_wo_extra_prompt(self, prompt: str, retry: int = 3) -> str:
for idx in range(retry):
try:
llm_resp = await self._aask(prompt)
llm_resp = llm_resp.strip()
logger.info(f"Action: {self.cls_name} llm _run_gpt35_wo_extra_prompt raw resp: {llm_resp}")
if self._func_validate(llm_resp, prompt):
return self._func_cleanup(llm_resp, prompt)
except Exception as exp:
logger.warning(f"Action: {self.cls_name} _run_gpt35_wo_extra_prompt exp: {exp}")
time.sleep(5) # usually avoid `Rate limit`
return self.fail_default_resp
async def run(self, *args, **kwargs):
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")

View file

@ -0,0 +1,47 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : summarize the content of agents' conversation
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class SummarizeConv(STAction):
name: str = "SummarizeConv"
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
resp = False
try:
_ = self._func_cleanup(llm_resp, prompt)
resp = True
except Exception:
pass
return resp
def _func_cleanup(self, llm_resp: str, prompt: str) -> str:
ret = "conversing about " + llm_resp.strip()
return ret
def _func_fail_default_resp(self) -> str:
return "conversing with a housemate about morning greetings"
async def run(self, conv: list):
def create_prompt_input(conversation: list):
convo_str = ""
for row in conversation:
convo_str += f'{row[0]}: "{row[1]}"\n'
prompt_input = [convo_str]
return prompt_input
prompt_input = create_prompt_input(conv)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "summarize_conversation_v1.txt")
example_output = "conversing about what to eat for lunch"
special_instruction = (
"The output must continue the sentence above by filling in the <fill in> tag. "
"Don't start with 'this is a conversation about...' Just finish the sentence "
"but do not miss any important details (including who are chatting)."
)
output = await self._run_gpt35(prompt, example_output, special_instruction)
logger.info(f"Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,173 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : task_decomp
import datetime
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class TaskDecomp(STAction):
name: str = "TaskDecomp"
def _func_cleanup(self, llm_resp: str, prompt: str) -> list:
# TODO SOMETHING HERE sometimes fails... See screenshot
temp = [i.strip() for i in llm_resp.split("\n")]
_cr = []
cr = []
for count, i in enumerate(temp):
if count != 0:
_cr += [" ".join([j.strip() for j in i.split(" ")][3:])]
else:
_cr += [i]
for count, i in enumerate(_cr):
k = [j.strip() for j in i.split("(duration in minutes:")]
task = k[0]
if task[-1] == ".":
task = task[:-1]
duration = int(k[1].split(",")[0].strip())
cr += [[task, duration]]
total_expected_min = int(prompt.split("(total duration in minutes")[-1].split("):")[0].strip())
# TODO -- now, you need to make sure that this is the same as the sum of
# the current action sequence.
curr_min_slot = [
["dummy", -1],
] # (task_name, task_index)
for count, i in enumerate(cr):
i_task = i[0]
i_duration = i[1]
i_duration -= i_duration % 5
if i_duration > 0:
for j in range(i_duration):
curr_min_slot += [(i_task, count)]
curr_min_slot = curr_min_slot[1:]
if len(curr_min_slot) > total_expected_min:
last_task = curr_min_slot[60]
for i in range(1, 6):
curr_min_slot[-1 * i] = last_task
elif len(curr_min_slot) < total_expected_min:
last_task = curr_min_slot[-1]
for i in range(total_expected_min - len(curr_min_slot)):
curr_min_slot += [last_task]
cr_ret = [
["dummy", -1],
]
for task, task_index in curr_min_slot:
if task != cr_ret[-1][0]:
cr_ret += [[task, 1]]
else:
cr_ret[-1][1] += 1
cr = cr_ret[1:]
return cr
def _func_validate(self, llm_resp: str, prompt: str) -> bool:
# TODO -- this sometimes generates error
try:
self._func_cleanup(llm_resp, prompt)
except Exception:
return False
return True
def _func_fail_default_resp(self) -> int:
fs = [["asleep", 0]]
return fs
async def run(self, role: "STRole", task_desc: int, truncated_act_dur: int, *args, **kwargs):
def create_prompt_input(role, task, duration):
"""
Today is Saturday June 25. From 00:00 ~ 06:00am, Maeve is
planning on sleeping, 06:00 ~ 07:00am, Maeve is
planning on waking up and doing her morning routine,
and from 07:00am ~08:00am, Maeve is planning on having breakfast.
"""
curr_f_org_index = role.scratch.get_f_daily_schedule_hourly_org_index()
all_indices = []
# if curr_f_org_index > 0:
# all_indices += [curr_f_org_index-1]
all_indices += [curr_f_org_index]
if curr_f_org_index + 1 <= len(role.scratch.f_daily_schedule_hourly_org):
all_indices += [curr_f_org_index + 1]
if curr_f_org_index + 2 <= len(role.scratch.f_daily_schedule_hourly_org):
all_indices += [curr_f_org_index + 2]
curr_time_range = ""
logger.debug("DEBUG")
logger.debug(role.scratch.f_daily_schedule_hourly_org)
logger.debug(all_indices)
summ_str = f'Today is {role.scratch.curr_time.strftime("%B %d, %Y")}. '
summ_str += "From "
for index in all_indices:
logger.debug(f"index {index}")
if index < len(role.scratch.f_daily_schedule_hourly_org):
start_min = 0
for i in range(index):
start_min += role.scratch.f_daily_schedule_hourly_org[i][1]
end_min = start_min + role.scratch.f_daily_schedule_hourly_org[index][1]
start_time = datetime.datetime.strptime("00:00:00", "%H:%M:%S") + datetime.timedelta(
minutes=start_min
)
end_time = datetime.datetime.strptime("00:00:00", "%H:%M:%S") + datetime.timedelta(
minutes=end_min
)
start_time_str = start_time.strftime("%H:%M%p")
end_time_str = end_time.strftime("%H:%M%p")
summ_str += (
f"{start_time_str} ~ {end_time_str}, {role.name} is planning "
f"on {role.scratch.f_daily_schedule_hourly_org[index][0]}, "
)
if curr_f_org_index + 1 == index:
curr_time_range = f"{start_time_str} ~ {end_time_str}"
summ_str = summ_str[:-2] + "."
prompt_input = []
prompt_input += [role.scratch.get_str_iss()]
prompt_input += [summ_str]
# prompt_input += [role.scratch.get_str_curr_date_str()]
prompt_input += [role.scratch.get_str_firstname()]
prompt_input += [role.scratch.get_str_firstname()]
prompt_input += [task]
prompt_input += [curr_time_range]
prompt_input += [duration]
prompt_input += [role.scratch.get_str_firstname()]
return prompt_input
prompt_input = create_prompt_input(role, task_desc, truncated_act_dur)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "task_decomp_v3.txt")
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=1000)
logger.info(f"Role: {role.name} {self.cls_name} output: {output}")
fin_output = []
time_sum = 0
for i_task, i_duration in output:
time_sum += i_duration
# HM?????????
# if time_sum < duration:
if time_sum <= truncated_act_dur:
fin_output += [[i_task, i_duration]]
else:
break
ftime_sum = 0
for fi_task, fi_duration in fin_output:
ftime_sum += fi_duration
fin_output[-1][1] += truncated_act_dur - ftime_sum
output = fin_output
task_decomp = output
ret = []
for decomp_task, duration in task_decomp:
ret += [[f"{task_desc} ({decomp_task})", duration]]
output = ret
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : wake_up
from metagpt.ext.stanford_town.actions.st_action import STAction
from metagpt.logs import logger
class WakeUp(STAction):
name: str = "WakeUp"
def _func_validate(self, llm_resp: str, prompt: str = None) -> bool:
try:
self._func_cleanup(llm_resp, prompt="")
except Exception:
return False
return True
def _func_cleanup(self, llm_resp: str, prompt: str) -> int:
cr = int(llm_resp.strip().lower().split("am")[0])
return cr
def _func_fail_default_resp(self) -> int:
fs = 8
return fs
async def run(self, role: "STRole"):
def create_prompt_input(role):
prompt_input = [
role.scratch.get_str_iss(),
role.scratch.get_str_lifestyle(),
role.scratch.get_str_firstname(),
]
return prompt_input
prompt_input = create_prompt_input(role)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "wake_up_hour_v1.txt")
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_gpt35_max_tokens(prompt, max_tokens=5)
logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}")
return output

View file

@ -0,0 +1,378 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : BasicMemory,AgentMemory实现
from datetime import datetime
from pathlib import Path
from typing import Optional
from pydantic import Field, field_serializer, model_validator
from metagpt.logs import logger
from metagpt.memory.memory import Memory
from metagpt.schema import Message
from metagpt.utils.common import read_json_file, write_json_file
class BasicMemory(Message):
"""
BasicMemory继承于MG的Message类其中content属性替代description属性
Message类中对于Chat类型支持的非常好对于Agent个体的Perceive,Reflection,Plan支持的并不多
在Type设计上我们延续GA的三个种类但是对于Chat种类的对话进行特别设计具体怎么设计还没想好
"""
memory_id: Optional[str] = Field(default=None) # 记忆ID
memory_count: int = -1 # 第几个记忆实际数值与Memory相等
type_count: int = -1 # 第几种记忆,类型为整数
memory_type: Optional[str] = Field(default=None) # 记忆类型,包含 event,thought,chat三种类型
depth: int = -1 # 记忆深度,类型为整数
created: Optional[datetime] = Field(default=None) # 创建时间
expiration: Optional[datetime] = Field(default=None) # 记忆失效时间,默认为空()
last_accessed: Optional[datetime] = Field(default=None) # 上一次调用的时间初始化时候与self.created一致
subject: Optional[str] = Field(default=None) # 主语
predicate: Optional[str] = Field(default=None) # 谓语
object: Optional[str] = Field(default=None) # 宾语
description: Optional[str] = Field(default=None)
embedding_key: Optional[str] = Field(default=None) # 内容与self.content一致
poignancy: int = -1 # importance值
keywords: list[str] = Field(default=[]) # keywords
filling: list = Field(default=[]) # 装的与之相关联的memory_id的列表
__hash__ = object.__hash__ # support hash in AgentMemory
@model_validator(mode="before")
@classmethod
def check_values(cls, values):
if "created" in values:
values["last_accessed"] = values["created"]
if "content" in values:
values["description"] = values["content"]
if "filling" in values:
values["filling"] = values["filling"] or []
return values
@field_serializer("created", "expiration")
def transform_time_field(self, time_field: Optional[datetime]) -> str:
if time_field:
time_field = time_field.strftime("%Y-%m-%d %H:%M:%S")
return time_field
def summary(self):
return self.subject, self.predicate, self.object
def save_to_dict(self) -> dict:
"""
将MemoryBasic类转化为字典用于存储json文件
这里需要注意cause_by跟GA不兼容所以需要做一个格式转换
"""
memory_dict = dict()
node_id = self.memory_id
basic_mem_obj = self.model_dump(
include=[
"node_count",
"type_count",
"type",
"depth",
"created",
"expiration",
"subject",
"predicate",
"object",
"description",
"embedding_key",
"poignancy",
"keywords",
"filling",
"cause_by",
]
)
memory_dict[node_id] = basic_mem_obj
return memory_dict
class AgentMemory(Memory):
"""
GA中主要存储三种JSON
1. embedding.json (Dict embedding_key:embedding)
2. Node.json (Dict Node_id:Node)
3. kw_strength.json
"""
storage: list[BasicMemory] = [] # 重写Storage存储BasicMemory所有节点
event_list: list[BasicMemory] = [] # 存储event记忆
thought_list: list[BasicMemory] = [] # 存储thought记忆
chat_list: list[BasicMemory] = [] # chat-related memory
event_keywords: dict[str, list[BasicMemory]] = dict() # 存储keywords
thought_keywords: dict[str, list[BasicMemory]] = dict()
chat_keywords: dict[str, list[BasicMemory]] = dict()
kw_strength_event: dict[str, int] = dict()
kw_strength_thought: dict[str, int] = dict()
memory_saved: Optional[Path] = Field(default=None)
embeddings: dict[str, list[float]] = dict()
def set_mem_path(self, memory_saved: Path):
self.memory_saved = memory_saved
self.load(memory_saved)
def save(self, memory_saved: Path):
"""
将MemoryBasic类存储为Nodes.json形式复现GA中的Kw Strength.json形式
这里添加一个路径即可
TODO 这里在存储时候进行倒序存储之后需要验证test_memory通过
"""
memory_json = dict()
for i in range(len(self.storage)):
memory_node = self.storage[len(self.storage) - i - 1]
memory_node = memory_node.save_to_dict()
memory_json.update(memory_node)
write_json_file(memory_saved.joinpath("nodes.json"), memory_json)
write_json_file(memory_saved.joinpath("embeddings.json"), self.embeddings)
strength_json = dict()
strength_json["kw_strength_event"] = self.kw_strength_event
strength_json["kw_strength_thought"] = self.kw_strength_thought
write_json_file(memory_saved.joinpath("kw_strength.json"), strength_json)
def load(self, memory_saved: Path):
"""
将GA的JSON解析填充到AgentMemory类之中
"""
self.embeddings = read_json_file(memory_saved.joinpath("embeddings.json"))
memory_load = read_json_file(memory_saved.joinpath("nodes.json"))
for count in range(len(memory_load.keys())):
node_id = f"node_{str(count + 1)}"
node_details = memory_load[node_id]
node_type = node_details["type"]
created = datetime.strptime(node_details["created"], "%Y-%m-%d %H:%M:%S")
expiration = None
if node_details["expiration"]:
expiration = datetime.strptime(node_details["expiration"], "%Y-%m-%d %H:%M:%S")
s = node_details["subject"]
p = node_details["predicate"]
o = node_details["object"]
description = node_details["description"]
embedding_pair = (node_details["embedding_key"], self.embeddings[node_details["embedding_key"]])
poignancy = node_details["poignancy"]
keywords = set(node_details["keywords"])
filling = node_details["filling"]
if node_type == "thought":
self.add_thought(
created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling
)
if node_type == "event":
self.add_event(created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling)
if node_type == "chat":
self.add_chat(created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling)
strength_keywords_load = read_json_file(memory_saved.joinpath("kw_strength.json"))
if strength_keywords_load["kw_strength_event"]:
self.kw_strength_event = strength_keywords_load["kw_strength_event"]
if strength_keywords_load["kw_strength_thought"]:
self.kw_strength_thought = strength_keywords_load["kw_strength_thought"]
def add(self, memory_basic: BasicMemory):
"""
Add a new message to storage, while updating the index
重写add方法修改原有的Message类为BasicMemory类并添加不同的记忆类型添加方式
"""
if memory_basic.memory_id in self.storage:
return
self.storage.append(memory_basic)
if memory_basic.memory_type == "chat":
self.chat_list[0:0] = [memory_basic]
return
if memory_basic.memory_type == "thought":
self.thought_list[0:0] = [memory_basic]
return
if memory_basic.memory_type == "event":
self.event_list[0:0] = [memory_basic]
return
def add_chat(
self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling, cause_by=""
):
"""
调用add方法初始化chat在创建的时候就需要调用embedding函数
"""
memory_count = len(self.storage) + 1
type_count = len(self.thought_list) + 1
memory_type = "chat"
memory_id = f"node_{str(memory_count)}"
depth = 1
memory_node = BasicMemory(
memory_id=memory_id,
memory_count=memory_count,
type_count=type_count,
memory_type=memory_type,
depth=depth,
created=created,
expiration=expiration,
subject=s,
predicate=p,
object=o,
description=content,
embedding_key=embedding_pair[0],
poignancy=poignancy,
keywords=keywords,
filling=filling,
cause_by=cause_by,
)
keywords = [i.lower() for i in keywords]
for kw in keywords:
if kw in self.chat_keywords:
self.chat_keywords[kw][0:0] = [memory_node]
else:
self.chat_keywords[kw] = [memory_node]
self.add(memory_node)
self.embeddings[embedding_pair[0]] = embedding_pair[1]
return memory_node
def add_thought(self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling):
"""
调用add方法初始化thought
"""
memory_count = len(self.storage) + 1
type_count = len(self.thought_list) + 1
memory_type = "thought"
memory_id = f"node_{str(memory_count)}"
depth = 1
try:
if filling:
depth_list = [memory_node.depth for memory_node in self.storage if memory_node.memory_id in filling]
depth += max(depth_list)
except Exception as exp:
logger.warning(f"filling init occur {exp}")
pass
memory_node = BasicMemory(
memory_id=memory_id,
memory_count=memory_count,
type_count=type_count,
memory_type=memory_type,
depth=depth,
created=created,
expiration=expiration,
subject=s,
predicate=p,
object=o,
description=content,
embedding_key=embedding_pair[0],
poignancy=poignancy,
keywords=keywords,
filling=filling,
)
keywords = [i.lower() for i in keywords]
for kw in keywords:
if kw in self.thought_keywords:
self.thought_keywords[kw][0:0] = [memory_node]
else:
self.thought_keywords[kw] = [memory_node]
self.add(memory_node)
if f"{p} {o}" != "is idle":
for kw in keywords:
if kw in self.kw_strength_thought:
self.kw_strength_thought[kw] += 1
else:
self.kw_strength_thought[kw] = 1
self.embeddings[embedding_pair[0]] = embedding_pair[1]
return memory_node
def add_event(self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling):
"""
调用add方法初始化event
"""
memory_count = len(self.storage) + 1
type_count = len(self.event_list) + 1
memory_type = "event"
memory_id = f"node_{str(memory_count)}"
depth = 0
if "(" in content:
content = " ".join(content.split()[:3]) + " " + content.split("(")[-1][:-1]
memory_node = BasicMemory(
memory_id=memory_id,
memory_count=memory_count,
type_count=type_count,
memory_type=memory_type,
depth=depth,
created=created,
expiration=expiration,
subject=s,
predicate=p,
object=o,
description=content,
embedding_key=embedding_pair[0],
poignancy=poignancy,
keywords=keywords,
filling=filling,
)
keywords = [i.lower() for i in keywords]
for kw in keywords:
if kw in self.event_keywords:
self.event_keywords[kw][0:0] = [memory_node]
else:
self.event_keywords[kw] = [memory_node]
self.add(memory_node)
if f"{p} {o}" != "is idle":
for kw in keywords:
if kw in self.kw_strength_event:
self.kw_strength_event[kw] += 1
else:
self.kw_strength_event[kw] = 1
self.embeddings[embedding_pair[0]] = embedding_pair[1]
return memory_node
def get_summarized_latest_events(self, retention):
ret_set = set()
for e_node in self.event_list[:retention]:
ret_set.add(e_node.summary())
return ret_set
def get_last_chat(self, target_role_name: str):
if target_role_name.lower() in self.chat_keywords:
return self.chat_keywords[target_role_name.lower()][0]
else:
return False
def retrieve_relevant_thoughts(self, s_content: str, p_content: str, o_content: str) -> set:
contents = [s_content, p_content, o_content]
ret = []
for i in contents:
if i in self.thought_keywords:
ret += self.thought_keywords[i.lower()]
ret = set(ret)
return ret
def retrieve_relevant_events(self, s_content: str, p_content: str, o_content: str) -> set:
contents = [s_content, p_content, o_content]
ret = []
for i in contents:
if i in self.event_keywords:
ret += self.event_keywords[i]
ret = set(ret)
return ret

Some files were not shown because too many files have changed in this diff Show more