Merge pull request #1175 from better629/feat_werewolf

Feat add werewolf game
This commit is contained in:
Alexander Wu 2024-04-11 10:11:04 +08:00 committed by GitHub
commit 4c16d4aee4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 2118 additions and 172 deletions

View file

@ -0,0 +1,218 @@
"""
Filename: MetaGPT/examples/werewolf_game/evals/eval.py
Created Date: Oct 18, 2023
Updated Date: Oct 24, 2023
Author: [Aria](https://github.com/ariafyy)
Info: eval the Voting Accuracy Rate of non_werewolves and Vote Difficulity
"""
import glob
import os
import re
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
from utils import Utils
from metagpt.const import DEFAULT_WORKSPACE_ROOT, METAGPT_ROOT
from metagpt.environment.werewolf.const import RoleType
class Vote:
"""Vote Evaluation"""
def __init__(self):
self.OUT_PATH = DEFAULT_WORKSPACE_ROOT / "outputs"
os.makedirs(self.OUT_PATH, exist_ok=True)
self.SUB_FOLDER_LIST = ["01-10", "11-20", "21-30"]
def _get_log_fileslist(self, IN_PATH) -> list[str]:
files_list = []
for SUB_FOLDER in self.SUB_FOLDER_LIST:
files_list.extend(glob.glob(str(IN_PATH / SUB_FOLDER / "*.txt")))
return files_list
def extract_votes_from_logs(self, files_list: list):
for in_logfile in tqdm(files_list):
SUB_FOLDER = (Path(in_logfile).parent).stem
out_txtfile = self.OUT_PATH / "# {0}_{1}.txt".format(SUB_FOLDER, Path(in_logfile).stem)
Utils().pick_vote_log(in_logfile, out_txtfile)
votefiles_list = Utils().get_file_list(self.OUT_PATH)
return votefiles_list
@staticmethod
def parse_vote_text2chunks(text: str):
"""
parse each game vote log into text chunks
one chunk example:
['Player1', 'Player2', 'Player3', 'Player5', 'Player6']. Say ONLY: I vote to eliminate ...
Player1(Witch): 49 | I vote to eliminate Player5
Player2(Villager): 49 | I vote to eliminate Player5
Player3(Villager): 49 | I vote to eliminate Player5
Player5(Werewolf): 49 | I vote to eliminate Player6
Player6(Seer): 49 | I vote to eliminate Player5
"""
pattern = re.compile(r"""\[([^\]]+)\]. Say ONLY: I vote to eliminate ...""")
chunks = {}
chunk_id = 0
last_end = 0
for match in pattern.finditer(text):
start = match.start()
chunk = text[last_end:start]
chunks[f"vote_{chunk_id}"] = chunk.strip()
last_end = match.end()
chunk_id += 1
final_chunk = text[last_end:].strip()
if final_chunk:
chunks[f"vote_{chunk_id}"] = final_chunk
return chunks
def _vote_rate_players(self, text: str):
"""
# calculate the rate of goodteam vote werewolves
:example:
input:
['Player1', 'Player2', 'Player3', 'Player5', 'Player6']. Say ONLY: I vote to eliminate ...
Player1(Witch): 49 | I vote to eliminate Player5
Player2(Villager): 49 | I vote to eliminate Player5
Player3(Villager): 49 | I vote to eliminate Player5
Player5(Werewolf): 49 | I vote to eliminate Player6
Player6(Seer): 49 | I vote to eliminate Player5
output:
werewolves: ['Player5']
non_werewolves: ['Player1', 'Player2', 'Player3', 'Player6']
as you can see :Player2(Villager) and Player3(Villager) vote to eliminate Player5(Werewolf)
:return goodteam vote rateability: 100.00%
"""
pattern = re.compile(r"(\w+)\(([^\)]+)\): \d+ \| I vote to eliminate (\w+)")
# find all werewolves
werewolves = []
for match in pattern.finditer(text):
if match.group(2) == RoleType.WEREWOLF.value:
werewolves.append(match.group(1))
# find all non_werewolves
non_werewolves = []
for match in pattern.finditer(text):
if match.group(2) != RoleType.WEREWOLF.value:
non_werewolves.append(match.group(1))
num_non_werewolves = len(non_werewolves)
# count players other than werewolves made the correct votes
correct_votes = 0
for match in pattern.finditer(text):
if match.group(2) != RoleType.WEREWOLF.value and match.group(3) in werewolves:
correct_votes += 1
# cal the rateability of non_werewolves
rate = correct_votes / num_non_werewolves
good_vote_rate = round(rate, 2)
return {"good_vote_rate": good_vote_rate, "werewolves": werewolves, "non_werewolves": non_werewolves}
def get_goodteam_vote_rate(self, text: str) -> float:
goodteam_vote_rate = self._vote_rate_players(text)["good_vote_rate"]
return goodteam_vote_rate
def get_werewolves(self, text: str) -> list:
werewolves_list = self._vote_rate_players(text)["werewolves"]
return werewolves_list
def get_non_werewolves(self, text: str) -> list:
non_werewolves_list = self._vote_rate_players(text)["non_werewolves"]
return non_werewolves_list
def get_votewolf_difficulty(self, werewolves: list, non_werewolves: list) -> str:
num_living_wolfs = len(werewolves)
num_living_players = len(werewolves) + len(non_werewolves)
votewolf_difficulty = "_{0} / {1}".format(num_living_wolfs, num_living_players)
return votewolf_difficulty
def get_result_df(self, out_txtfile: str) -> pd.DataFrame:
"""
folder: sub folders for evals
file: evaluation file, each file represents one game
votes: the number of votes, eg. vote_1 represents the first vote of this game,
good_vote_rate:the rateability of a good person voting against a werewolf,
correct_votes / the total number of players other than werewolves
total_votes:the total number of votes cast
"""
with open(out_txtfile, "r") as out_file:
text = out_file.read()
chunks = self.parse_vote_text2chunks(text)
res = []
for k, v in chunks.items():
if v != "":
chunks_list = list(chunks.keys())
total_votes = len(chunks_list) - 1
werewolves = self.get_werewolves(v)
non_werewolves = self.get_non_werewolves(v)
good_vote_rate = self.get_goodteam_vote_rate(v)
votewolf_difficulty = self.get_votewolf_difficulty(werewolves, non_werewolves)
folder = Utils().filename_to_foldername(out_txtfile)
result = {
"folder": folder,
"file": Path(out_txtfile).stem + ".txt",
"vote_round": k,
"good_vote_rate": good_vote_rate,
"total_votes": total_votes,
"votewolf_difficulty": votewolf_difficulty,
}
res.append(result)
df = pd.DataFrame(res)
return df
def calc_avg_rate(self, IN_PATH) -> pd.DataFrame:
"""
get avg_rate for each game
avg_rate : the good_rate/total number of votes in the game
vote1_rate: First Round Voting Accuracy Rate
"""
infiles_list = self._get_log_fileslist(IN_PATH)
votefiles_list = self.extract_votes_from_logs(infiles_list)
df_list = [self._load_df_from_file(file) for file in votefiles_list]
combined_df = pd.concat(df_list, ignore_index=True)
# calculate the average good_vote_rate for each file
mean_rates = self._calculate_mean_rates(combined_df)
combined_df["avg_rate"] = combined_df["file"].map(mean_rates)
# calculate vote1 rate
vote1_rates = self._calc_vote1_rates(combined_df)
combined_df["vote1_rate"] = combined_df["folder"].map(vote1_rates.set_index("folder")["good_vote_rate"])
combined_df.loc[combined_df["vote_round"] != "vote_1", "vote1_rate"] = np.nan
combined_df["vote1_rate"] = combined_df["vote1_rate"].apply(self._format_rates)
combined_df["good_vote_rate"] = combined_df["good_vote_rate"].apply(self._format_rates)
combined_df["avg_rate"] = combined_df["avg_rate"].apply(self._format_rates)
combined_df.sort_values(["file"], ascending=True, inplace=True)
return combined_df
def _calc_vote1_rates(self, df):
df_vote1 = df[df["vote_round"] == "vote_1"]
vote1_rates = df_vote1.groupby("folder")["good_vote_rate"].mean().reset_index()
return vote1_rates
def _load_df_from_file(self, file):
return self.get_result_df(file)
def _calculate_mean_rates(self, df):
return df.groupby("file")["good_vote_rate"].mean()
def _format_rates(self, s):
return Utils().float_to_percent(s)
def get_eval_csv(self, IN_PATH, EVAL_RESULT):
"""
IN_PATH : parent folder of ["01-10", "11-20", "21-30"]
EVAL_RESULT : output csv file path
"""
combined_df = self.calc_avg_rate(IN_PATH)
combined_df.to_csv(EVAL_RESULT, index=False)
if __name__ == "__main__":
IN_PATH = METAGPT_ROOT / "examples/werewolf_game/evals"
EVAL_RESULT = DEFAULT_WORKSPACE_ROOT / "outputs" / "goodteam_vote_rate.csv"
Vote().get_eval_csv(IN_PATH, EVAL_RESULT)

View file

@ -0,0 +1,134 @@
"""
Filename: MetaGPT/examples/werewolf_game/evals/utils.py
Created Date: Oct 11, 2023
Revised Date: Oct 20, 2023
Author: [Aria](https://github.com/ariafyy)
"""
import glob
import os
import re
from pathlib import Path
from metagpt.const import METAGPT_ROOT
class Utils:
"""Utils: utils of logs"""
@staticmethod
def polish_log(in_logfile, out_txtfile):
"""polish logs for evaluation"""
pattern_text = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \| (\w+) +\| ([\w\.]+:\w+:\d+) - (.*\S)"
pattern_player = r"(Player(\d{1}): \w+)"
pattern_start = False
json_start = False
with open(in_logfile, "r") as f, open(out_txtfile, "w") as out:
for line in f.readlines():
matches = re.match(pattern_text, line)
if matches:
message = matches.group(4).strip()
pattern_start = True
json_start = False
if (
"Moderator(Moderator) ready to InstructSpeak" not in message
and "Moderator(Moderator) ready to ParseSpeak" not in message
and "Total running cost:" not in message
):
out.write("- " + message + "\n")
else:
out.write("\n")
elif pattern_start and not matches:
if "gpt-4 may update over time" in line:
line = ""
out.write(line)
elif line.strip().startswith("{"):
out.write(line.strip())
json_start = True
elif json_start and not line.strip().endswith("}"):
out.write(line.strip())
elif json_start and line.strip().endswith("}"):
out.write(line.strip())
json_start = False
elif (
line.startswith("(User):") or line.startswith("********** STEP:") or re.search(pattern_player, line)
):
out.write(line)
else:
out.write("\n")
@staticmethod
def pick_vote_log(in_logfile, out_txtfile):
"""
pick the vote log from the log file.
ready to AnnounceGameResult serves as the 'HINT_TEXT ' which indicates the end of the game.
based on bservation and reflection, then discuss is not in vote session.
"""
pattern_vote = r"(Player\d+)\(([A-Za-z]+)\): (\d+) \| (I vote to eliminate Player\d+)"
ignore_text = """reflection"""
HINT_TEXT = r"ready to AnnounceGameResult"
pattern_moderator = r"\[([^\]]+)\]\. Say ONLY: I vote to eliminate ..."
in_valid_block = False
with open(in_logfile, "r") as f:
lines = f.read()
split_lines = lines.split(HINT_TEXT)
if len(split_lines) < 2:
print(f"Key text :{HINT_TEXT} not found in {in_logfile}")
return
relevant_lines = split_lines[1].split("\n")
with open(out_txtfile, "w") as out:
for line in relevant_lines:
if re.search(pattern_moderator, line):
in_valid_block = True
out.write(line.lstrip() + "\n")
elif in_valid_block and re.search(pattern_vote, line):
out.write(line + "\n")
elif ignore_text in line:
in_valid_block = False
@staticmethod
def get_file_list(path: str) -> list:
file_pattern = os.path.join(path, "*.txt")
files_list = glob.glob(file_pattern)
return files_list
@staticmethod
def filename_to_foldername(out_txtfile: str):
"""
convert filename into its parent folder name
input:"....../# 01-10_10132100.txt"
output:# 01-10
"""
s = Path(out_txtfile).stem
pattern_folder = r"([^_]*)_"
match = re.match(pattern_folder, s)
if match:
folder = match.group(1)
return folder
@staticmethod
def float_to_percent(decimal: float) -> str:
"""
input: 1.00
output: 100.00%
"""
percent = decimal * 100
return f"{percent:.2f}%"
if __name__ == "__main__":
in_logfile = METAGPT_ROOT / "logs/log.txt"
out_txtfile = "input your wish path"
# Utils().polish_log(in_logfile, out_txtfile)
Utils().pick_vote_log(in_logfile, out_txtfile)

View file

@ -0,0 +1,66 @@
import asyncio
import fire
from metagpt.ext.werewolf.roles import Guard, Moderator, Seer, Villager, Werewolf, Witch
from metagpt.ext.werewolf.werewolf_game import WerewolfGame
from metagpt.logs import logger
async def start_game(
investment: float = 3.0,
n_round: int = 5,
shuffle: bool = True,
add_human: bool = False,
use_reflection: bool = True,
use_experience: bool = False,
use_memory_selection: bool = False,
new_experience_version: str = "",
):
game = WerewolfGame()
game_setup, players = game.env.init_game_setup(
role_uniq_objs=[Villager, Werewolf, Guard, Seer, Witch],
num_werewolf=2,
num_villager=2,
shuffle=shuffle,
add_human=add_human,
use_reflection=use_reflection,
use_experience=use_experience,
use_memory_selection=use_memory_selection,
new_experience_version=new_experience_version,
)
logger.info(f"{game_setup}")
players = [Moderator()] + players
game.hire(players)
game.invest(investment)
game.run_project(game_setup)
await game.run(n_round=n_round)
def main(
investment: float = 20.0,
n_round: int = 100,
shuffle: bool = True,
add_human: bool = False,
use_reflection: bool = True,
use_experience: bool = False,
use_memory_selection: bool = False,
new_experience_version: str = "",
):
asyncio.run(
start_game(
investment,
n_round,
shuffle,
add_human,
use_reflection,
use_experience,
use_memory_selection,
new_experience_version,
)
)
if __name__ == "__main__":
fire.Fire(main)

View file

@ -19,7 +19,7 @@ from metagpt.logs import logger
from metagpt.utils.common import load_mc_skills_code, read_json_file, write_json_file
class MinecraftEnv(Environment, MinecraftExtEnv):
class MinecraftEnv(MinecraftExtEnv, Environment):
"""MinecraftEnv, including shared memory of cache and information between roles"""
model_config = ConfigDict(arbitrary_types_allowed=True)

View file

@ -0,0 +1,121 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from enum import Enum
from metagpt.const import MESSAGE_ROUTE_TO_ALL
class RoleType(Enum):
VILLAGER = "Villager"
WEREWOLF = "Werewolf"
GUARD = "Guard"
SEER = "Seer"
WITCH = "Witch"
MODERATOR = "Moderator"
class RoleState(Enum):
ALIVE = "alive" # the role is alive
DEAD = "dead" # killed or poisoned
KILLED = "killed" # killed by werewolf or voting
POISONED = "poisoned" # killed by poison
SAVED = "saved" # saved by antidote
PROTECTED = "projected" # projected by guard
class RoleActionRes(Enum):
SAVE = "save"
PASS = "pass" # ignore current action output
empty_set = set()
# the ordered rules by the moderator to announce to everyone each step
STEP_INSTRUCTIONS = {
0: {
"content": "Its dark, everyone close your eyes. I will talk with you/your team secretly at night.",
"send_to": {RoleType.MODERATOR.value}, # for moderator to continue speaking
"restricted_to": empty_set,
},
1: {
"content": "Guard, please open your eyes!",
"send_to": {RoleType.MODERATOR.value}, # for moderator to continue speaking
"restricted_to": empty_set,
},
2: {
"content": """Guard, now tell me who you protect tonight?
You only choose one from the following living options please: {living_players}.
Or you can pass. For example: Protect ...""",
"send_to": {RoleType.GUARD.value},
"restricted_to": {RoleType.MODERATOR.value, RoleType.GUARD.value},
},
3: {"content": "Guard, close your eyes", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
4: {
"content": "Werewolves, please open your eyes!",
"send_to": {RoleType.MODERATOR.value},
"restricted_to": empty_set,
},
5: {
"content": """Werewolves, I secretly tell you that {werewolf_players} are
all of the {werewolf_num} werewolves! Keep in mind you are teammates. The rest players are not werewolves.
choose one from the following living options please:
{living_players}. For example: Kill ...""",
"send_to": {RoleType.WEREWOLF.value},
"restricted_to": {RoleType.MODERATOR.value, RoleType.WEREWOLF.value},
},
6: {"content": "Werewolves, close your eyes", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
7: {"content": "Witch, please open your eyes!", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
8: {
"content": """Witch, tonight {player_hunted} has been killed by the werewolves.
You have a bottle of antidote, would you like to save him/her? If so, say "Save", else, say "Pass".""",
"send_to": {RoleType.WITCH.value},
"restricted_to": {RoleType.MODERATOR.value, RoleType.WITCH.value},
}, # 要先判断女巫是否有解药,再去询问女巫是否使用解药救人
9: {
"content": """Witch, you also have a bottle of poison, would you like to use it to kill one of the living players?
Choose one from the following living options: {living_players}.
If so, say ONLY "Poison PlayerX", replace PlayerX with the actual player name, else, say "Pass".""",
"send_to": {RoleType.WITCH.value},
"restricted_to": {RoleType.MODERATOR.value, RoleType.WITCH.value},
}, #
10: {"content": "Witch, close your eyes", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
11: {"content": "Seer, please open your eyes!", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
12: {
"content": """Seer, you can check one player's identity. Who are you going to verify its identity tonight?
Choose only one from the following living options:{living_players}.""",
"send_to": {RoleType.SEER.value},
"restricted_to": {RoleType.MODERATOR.value, RoleType.SEER.value},
},
13: {"content": "Seer, close your eyes", "send_to": {RoleType.MODERATOR.value}, "restricted_to": empty_set},
# The 1-st daytime
14: {
"content": """It's daytime. Everyone woke up except those who had been killed.""",
"send_to": {RoleType.MODERATOR.value},
"restricted_to": empty_set,
},
15: {
"content": "{player_current_dead} was killed last night!",
"send_to": {RoleType.MODERATOR.value},
"restricted_to": empty_set,
},
16: {
"content": """Living players: {living_players}, now freely talk about the current situation based on your observation and
reflection with a few sentences. Decide whether to reveal your identity based on your reflection.""",
"send_to": {MESSAGE_ROUTE_TO_ALL}, # send to all to speak in daytime
"restricted_to": empty_set,
},
17: {
"content": """Now vote and tell me who you think is the werewolf. Dont mention your role.
You only choose one from the following living options please:
{living_players}. Say ONLY: I vote to eliminate ...""",
"send_to": {MESSAGE_ROUTE_TO_ALL},
"restricted_to": empty_set,
},
18: {
"content": """{player_current_dead} was eliminated.""",
"send_to": {RoleType.MODERATOR.value},
"restricted_to": empty_set,
},
}

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : werewolf observation/action space and its action definition
from gymnasium import spaces
from pydantic import ConfigDict, Field
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvActionType
from metagpt.environment.werewolf.const import STEP_INSTRUCTIONS
class EnvActionType(BaseEnvActionType):
NONE = 0 # no action to run, just get observation
WOLF_KILL = 1 # wolf kill someone
VOTE_KILL = 2 # vote kill someone
WITCH_POISON = 3 # witch poison someone
WITCH_SAVE = 4 # witch save someone
GUARD_PROTECT = 5 # guard protect someone
PROGRESS_STEP = 6 # step increment
class EnvAction(BaseEnvAction):
model_config = ConfigDict(arbitrary_types_allowed=True)
action_type: int = Field(default=EnvActionType.NONE, description="action type")
player_name: str = Field(default="", description="the name of the player to do the action")
target_player_name: str = Field(default="", description="the name of the player who take the action")
def get_observation_space() -> spaces.Dict:
space = spaces.Dict(
{
"game_setup": spaces.Text(256),
"step_idx": spaces.Discrete(len(STEP_INSTRUCTIONS)),
"living_players": spaces.Tuple(
(spaces.Text(16), spaces.Text(16))
), # TODO should be tuple of variable length
"werewolf_players": spaces.Tuple(
(spaces.Text(16), spaces.Text(16))
), # TODO should be tuple of variable length
"player_hunted": spaces.Text(16),
"player_current_dead": spaces.Tuple((spaces.Text(16))), # TODO should be tuple of variable length
"witch_poison_left": spaces.Discrete(2),
"witch_antidote_left": spaces.Discrete(2),
"winner": spaces.Text(16),
"win_reason": spaces.Text(64),
}
)
return space
def get_action_space() -> spaces.Dict:
space = spaces.Dict(
{
"action_type": spaces.Discrete(len(EnvActionType)),
"player_name": spaces.Text(16), # the player to do the action
"target_player_name": spaces.Text(16), # the target player who take the action
}
)
return space

View file

@ -2,30 +2,40 @@
# -*- coding: utf-8 -*-
# @Desc : MG Werewolf Env
from typing import Iterable
from pydantic import Field
from metagpt.environment.base_env import Environment
from metagpt.environment.werewolf.werewolf_ext_env import WerewolfExtEnv
from metagpt.logs import logger
from metagpt.schema import Message
class WerewolfEnv(Environment, WerewolfExtEnv):
timestamp: int = Field(default=0)
class WerewolfEnv(WerewolfExtEnv, Environment):
round_cnt: int = Field(default=0)
def add_roles(self, roles: Iterable["Role"]):
"""增加一批在当前环境的角色
Add a batch of characters in the current environment
"""
for role in roles:
self.roles[role.name] = role # use name as key here, due to multi-player can have same profile
for role in roles: # setup system message with roles
role.context = self.context
role.set_env(self)
def publish_message(self, message: Message, add_timestamp: bool = True):
"""Post information to the current environment"""
logger.debug(f"publish_message: {message.dump()}")
if add_timestamp:
# Because the content of the message may be repeated, for example, killing the same person in two nights
# Therefore, a unique timestamp prefix needs to be added so that the same message will not be automatically deduplicated when added to the memory.
message.content = f"{self.timestamp} | " + message.content
self.memory.add(message)
self.history += f"\n{message}"
# Therefore, a unique round_cnt prefix needs to be added so that the same message will not be automatically deduplicated when added to the memory.
message.content = f"{self.round_cnt} | " + message.content
super().publish_message(message)
async def run(self, k=1):
"""Process all Role runs by order"""
for _ in range(k):
for role in self.roles.values():
await role.run()
self.timestamp += 1
self.round_cnt += 1

View file

@ -4,110 +4,27 @@
import random
from collections import Counter
from enum import Enum
from typing import Any, Callable, Optional
from pydantic import ConfigDict, Field
from metagpt.environment.base_env import ExtEnv, mark_as_readable, mark_as_writeable
from metagpt.environment.base_env_space import BaseEnvAction, BaseEnvObsParams
from metagpt.environment.base_env_space import BaseEnvObsParams
from metagpt.environment.werewolf.const import STEP_INSTRUCTIONS, RoleState, RoleType
from metagpt.environment.werewolf.env_space import EnvAction, EnvActionType
from metagpt.logs import logger
class RoleState(Enum):
ALIVE = "alive" # the role is alive
KILLED = "killed" # the role is killed by werewolf or voting
POISONED = "poisoned" # the role is killed by posion
SAVED = "saved" # the role is saved by antidote
# the ordered rules by the moderator to announce to everyone each step
STEP_INSTRUCTIONS = {
0: {
"content": "Its dark, everyone close your eyes. I will talk with you/your team secretly at night.",
"send_to": "Moderator", # for moderator to continuen speaking
"restricted_to": "",
},
1: {
"content": "Guard, please open your eyes!",
"send_to": "Moderator", # for moderator to continuen speaking
"restricted_to": "",
},
2: {
"content": """Guard, now tell me who you protect tonight?
You only choose one from the following living options please: {living_players}.
Or you can pass. For example: Protect ...""",
"send_to": "Guard",
"restricted_to": "Moderator,Guard",
},
3: {"content": "Guard, close your eyes", "send_to": "Moderator", "restricted_to": ""},
4: {"content": "Werewolves, please open your eyes!", "send_to": "Moderator", "restricted_to": ""},
5: {
"content": """Werewolves, I secretly tell you that {werewolf_players} are
all of the 2 werewolves! Keep in mind you are teammates. The rest players are not werewolves.
choose one from the following living options please:
{living_players}. For example: Kill ...""",
"send_to": "Werewolf",
"restricted_to": "Moderator,Werewolf",
},
6: {"content": "Werewolves, close your eyes", "send_to": "Moderator", "restricted_to": ""},
7: {"content": "Witch, please open your eyes!", "send_to": "Moderator", "restricted_to": ""},
8: {
"content": """Witch, tonight {player_hunted} has been killed by the werewolves.
You have a bottle of antidote, would you like to save him/her? If so, say "Save", else, say "Pass".""",
"send_to": "Witch",
"restricted_to": "Moderator,Witch",
}, # 要先判断女巫是否有解药,再去询问女巫是否使用解药救人
9: {
"content": """Witch, you also have a bottle of poison, would you like to use it to kill one of the living players?
Choose one from the following living options: {living_players}.
If so, say ONLY "Poison PlayerX", replace PlayerX with the actual player name, else, say "Pass".""",
"send_to": "Witch",
"restricted_to": "Moderator,Witch",
}, #
10: {"content": "Witch, close your eyes", "send_to": "Moderator", "restricted_to": ""},
11: {"content": "Seer, please open your eyes!", "send_to": "Moderator", "restricted_to": ""},
12: {
"content": """Seer, you can check one player's identity. Who are you going to verify its identity tonight?
Choose only one from the following living options:{living_players}.""",
"send_to": "Seer",
"restricted_to": "Moderator,Seer",
},
13: {"content": "Seer, close your eyes", "send_to": "Moderator", "restricted_to": ""},
# The 1-st daytime
14: {
"content": """It's daytime. Everyone woke up except those who had been killed.""",
"send_to": "Moderator",
"restricted_to": "",
},
15: {"content": "{player_current_dead} was killed last night!", "send_to": "Moderator", "restricted_to": ""},
16: {
"content": """Living players: {living_players}, now freely talk about the current situation based on your observation and
reflection with a few sentences. Decide whether to reveal your identity based on your reflection.""",
"send_to": "", # send to all to speak in daytime
"restricted_to": "",
},
17: {
"content": """Now vote and tell me who you think is the werewolf. Dont mention your role.
You only choose one from the following living options please:
{living_players}. Say ONLY: I vote to eliminate ...""",
"send_to": "",
"restricted_to": "",
},
18: {"content": """{player_current_dead} was eliminated.""", "send_to": "Moderator", "restricted_to": ""},
}
class WerewolfExtEnv(ExtEnv):
model_config = ConfigDict(arbitrary_types_allowed=True)
players_state: dict[str, tuple[str, RoleState]] = Field(
default=dict(), description="the player's role type and state by player_name"
default_factory=dict, description="the player's role type and state by player_name"
)
round_idx: int = Field(default=0) # the current round
step_idx: int = Field(default=0) # the current step of current round
eval_step_idx: int = Field(default=0)
eval_step_idx: list[int] = Field(default=[])
per_round_steps: int = Field(default=len(STEP_INSTRUCTIONS))
# game global states
@ -115,13 +32,13 @@ class WerewolfExtEnv(ExtEnv):
special_role_players: list[str] = Field(default=[])
winner: Optional[str] = Field(default=None)
win_reason: Optional[str] = Field(default=None)
witch_poison_left: int = Field(default=1)
witch_antidote_left: int = Field(default=1)
witch_poison_left: int = Field(default=1, description="should be 1 or 0")
witch_antidote_left: int = Field(default=1, description="should be 1 or 0")
# game current round states, a round is from closing your eyes to the next time you close your eyes
round_hunts: dict[str, str] = Field(default=dict(), description="nighttime wolf hunt result")
round_hunts: dict[str, str] = Field(default_factory=dict, description="nighttime wolf hunt result")
round_votes: dict[str, str] = Field(
default=dict(), description="daytime all players vote result, key=voteer, value=voted one"
default_factory=dict, description="daytime all players vote result, key=voter, value=voted one"
)
player_hunted: Optional[str] = Field(default=None)
player_protected: Optional[str] = Field(default=None)
@ -135,13 +52,69 @@ class WerewolfExtEnv(ExtEnv):
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""currently unused"""
pass
def observe(self, obs_params: Optional[BaseEnvObsParams] = None) -> Any:
"""currently unused"""
pass
def step(self, action: BaseEnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
pass
def _get_obs(self):
return {
"game_setup": self.game_setup,
"step_idx": self.step_idx,
"living_players": self.living_players,
"werewolf_players": self.werewolf_players, # currently, lack observation isolation
"player_hunted": self.player_hunted,
"player_current_dead": self.player_current_dead,
"witch_poison_left": self.witch_poison_left,
"witch_antidote_left": self.witch_antidote_left,
"winner": self.winner,
"win_reason": self.win_reason,
}
def step(self, action: EnvAction) -> tuple[dict[str, Any], float, bool, bool, dict[str, Any]]:
action_type = action.action_type
player_name = action.player_name
target_player_name = action.target_player_name
if action_type == EnvActionType.WOLF_KILL:
self.wolf_kill_someone(wolf_name=player_name, player_name=target_player_name)
elif action_type == EnvActionType.VOTE_KILL:
self.vote_kill_someone(voter_name=player_name, player_name=target_player_name)
elif action_type == EnvActionType.WITCH_POISON:
self.witch_poison_someone(witch_name=player_name, player_name=target_player_name)
elif action_type == EnvActionType.WITCH_SAVE:
self.witch_save_someone(witch_name=player_name, player_name=target_player_name)
elif action_type == EnvActionType.GUARD_PROTECT:
self.guard_protect_someone(guard_name=player_name, player_name=target_player_name)
elif action_type == EnvActionType.PROGRESS_STEP:
self.progress_step()
elif action_type == EnvActionType.NONE:
pass
else:
raise ValueError(f"not supported action_type: {action_type}")
self.update_game_states()
terminated = self._check_game_finish()
obs = self._get_obs()
return obs, 1.0, terminated, False, {}
def _check_game_finish(self) -> bool:
"""return True if game finished else False"""
# game's termination condition
terminated = False
living_werewolf = [p for p in self.werewolf_players if p in self.living_players]
living_villagers = [p for p in self.villager_players if p in self.living_players]
living_special_roles = [p for p in self.special_role_players if p in self.living_players]
if not living_werewolf:
self.winner = "good guys"
self.win_reason = "werewolves all dead"
terminated = True
elif not living_villagers or not living_special_roles:
self.winner = "werewolf"
self.win_reason = "villagers all dead" if not living_villagers else "special roles all dead"
terminated = True
return terminated
@property
def living_players(self) -> list[str]:
@ -161,12 +134,12 @@ class WerewolfExtEnv(ExtEnv):
@property
def werewolf_players(self) -> list[str]:
player_names = self._role_type_players(role_type="Werewolf")
player_names = self._role_type_players(role_type=RoleType.WEREWOLF.value)
return player_names
@property
def villager_players(self) -> list[str]:
player_names = self._role_type_players(role_type="Villager")
player_names = self._role_type_players(role_type=RoleType.VILLAGER.value)
return player_names
def _init_players_state(self, players: list["Role"]):
@ -193,14 +166,14 @@ class WerewolfExtEnv(ExtEnv):
"""init players using different roles' num"""
role_objs = []
for role_obj in role_uniq_objs:
if str(role_obj) == "Villager":
if RoleType.VILLAGER.value in str(role_obj):
role_objs.extend([role_obj] * num_villager)
elif str(role_obj) == "Werewolf":
elif RoleType.WEREWOLF.value in str(role_obj):
role_objs.extend([role_obj] * num_werewolf)
else:
role_objs.append(role_obj)
if shuffle:
random.shuffle(len(role_objs))
random.shuffle(role_objs)
if add_human:
assigned_role_idx = random.randint(0, len(role_objs) - 1)
assigned_role = role_objs[assigned_role_idx]
@ -233,10 +206,12 @@ class WerewolfExtEnv(ExtEnv):
roletype_state = self.players_state[player_name]
self.players_state[player_name] = (roletype_state[0], state)
def _check_valid_role(self, player: "Role", role_type: str) -> bool:
return True if role_type in str(player) else False
def _check_valid_role(self, player_name: str, role_type: str) -> bool:
roletype_state = self.players_state.get(player_name)
return True if roletype_state and role_type in roletype_state[0] else False
def _check_player_continue(self, player_name: str, particular_step: int = -1) -> bool:
"""to check if can do the operation to the player"""
step_idx = self.step_idx % self.per_round_steps
if particular_step > 0 and step_idx != particular_step: # step no
# particular_step = 18, not daytime vote time, ignore
@ -253,6 +228,10 @@ class WerewolfExtEnv(ExtEnv):
self.step_idx += 1
return instruction
@mark_as_writeable
def progress_step(self):
self.step_idx += 1
@mark_as_readable
def get_players_state(self, player_names: list[str]) -> dict[str, RoleState]:
players_state = {
@ -263,57 +242,72 @@ class WerewolfExtEnv(ExtEnv):
return players_state
@mark_as_writeable
def vote_kill_someone(self, voteer: "Role", player_name: str = None):
def vote_kill_someone(self, voter_name: str, player_name: str = None):
"""player vote result at daytime
player_name: if it's None, regard as abstaining from voting
"""
if not self._check_player_continue(voteer.name, particular_step=18): # 18=step no
if not self._check_player_continue(voter_name, particular_step=18): # 18=step no
return
self.round_votes[voteer.name] = player_name
self.round_votes[voter_name] = player_name
# check if all living players finish voting, then get the dead one
if list(self.round_votes.keys()) == self.living_players:
voted_all = list(self.round_votes.values()) # TODO in case of tie vote, check who was voted first
voted_all = [item for item in voted_all if item]
self.player_current_dead = Counter(voted_all).most_common()[0][0]
self._update_players_state([self.player_current_dead])
self.player_current_dead = [Counter(voted_all).most_common()[0][0]]
self._update_players_state(self.player_current_dead)
@mark_as_writeable
def wolf_kill_someone(self, wolf: "Role", player_name: str):
if not self._check_valid_role(wolf, "Werewolf"):
def wolf_kill_someone(self, wolf_name: str, player_name: str):
if not self._check_valid_role(wolf_name, RoleType.WEREWOLF.value):
return
if not self._check_player_continue(wolf.name, particular_step=5): # 5=step no
if not self._check_player_continue(wolf_name, particular_step=6): # 5=step no
return
self.round_hunts[wolf.name] = player_name
living_werewolf = [p for p in self.werewolf_players if p in self.living_players]
self.round_hunts[wolf_name] = player_name
# living_werewolf = [p for p in self.werewolf_players if p in self.living_players]
# check if all living wolfs finish hunting, then get the hunted one
if list(self.round_hunts.keys()) == living_werewolf:
hunted_all = list(self.round_hunts.values())
self.player_hunted = Counter(hunted_all).most_common()[0][0]
# if list(self.round_hunts.keys()) == living_werewolf:
# hunted_all = list(self.round_hunts.values())
# self.player_hunted = Counter(hunted_all).most_common()[0][0]
self.player_hunted = player_name
@mark_as_writeable
def witch_poison_someone(self, witch: "Role", player_name: str = None):
if not self._check_valid_role(witch, "Witch"):
def _witch_poison_or_save_someone(
self, witch_name: str, player_name: str = None, state: RoleState = RoleState.POISONED
):
if not self._check_valid_role(witch_name, RoleType.WITCH.value):
return
if not self._check_player_continue(player_name):
return
self._update_players_state([player_name], RoleState.POISONED)
self.player_poisoned = player_name
assert state in [RoleState.POISONED, RoleState.SAVED]
self._update_players_state([player_name], state)
if state == RoleState.POISONED:
self.player_poisoned = player_name
self.witch_poison_left -= 1
else:
# self.player_protected = player_name
self.is_hunted_player_saved = True
self.witch_antidote_left -= 1
@mark_as_writeable
def witch_save_someone(self, witch: "Role", player_name: str = None):
if not self._check_valid_role(witch, "Witch"):
def witch_poison_someone(self, witch_name: str, player_name: str = None):
self._witch_poison_or_save_someone(witch_name, player_name, RoleState.POISONED)
@mark_as_writeable
def witch_save_someone(self, witch_name: str, player_name: str = None):
self._witch_poison_or_save_someone(witch_name, player_name, RoleState.SAVED)
@mark_as_writeable
def guard_protect_someone(self, guard_name: str, player_name: str = None):
if not self._check_valid_role(guard_name, RoleType.GUARD.value):
return
if not self._check_player_continue(player_name):
return
self._update_players_state([player_name], RoleState.SAVED)
self.player_protected = player_name
@mark_as_writeable
def update_game_states(self, memories: list):
def update_game_states(self):
step_idx = self.step_idx % self.per_round_steps
if step_idx not in [15, 18] or self.step_idx in self.eval_step_idx:
return
@ -329,22 +323,12 @@ class WerewolfExtEnv(ExtEnv):
if self.player_poisoned:
self.player_current_dead.append(self.player_poisoned)
self._update_players_state([self.player_current_dead])
self._update_players_state(self.player_current_dead)
# reset
self.player_hunted = None
self.player_protected = None
self.is_hunted_player_saved = False
self.player_poisoned = None
# game's termination condition
living_werewolf = [p for p in self.werewolf_players if p in self.living_players]
living_villagers = [p for p in self.villager_players if p in self.living_players]
living_special_roles = [p for p in self.special_role_players if p in self.living_players]
if not living_werewolf:
self.winner = "good guys"
self.win_reason = "werewolves all dead"
elif not living_villagers or not living_special_roles:
self.winner = "werewolf"
self.win_reason = "villagers all dead" if not living_villagers else "special roles all dead"
if self.winner is not None:
self._record_all_experiences() # TODO
elif step_idx == 18:
# updated use vote_kill_someone
pass

View file

@ -49,6 +49,7 @@ def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
def get_embedding(text, model: str = "text-embedding-ada-002"):
text = text.replace("\n", " ")
embedding = None
if not text:
text = "this is blank"
for idx in range(3):
@ -56,7 +57,8 @@ def get_embedding(text, model: str = "text-embedding-ada-002"):
embedding = (
OpenAI(api_key=config.llm.api_key).embeddings.create(input=[text], model=model).data[0].embedding
)
except Exception:
except Exception as exp:
logger.info(f"get_embedding failed, exp: {exp}, will retry.")
time.sleep(5)
if not embedding:
raise ValueError("get_embedding failed")

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from metagpt.ext.werewolf.actions.werewolf_actions import Hunt, Impersonate
from metagpt.ext.werewolf.actions.guard_actions import Protect
from metagpt.ext.werewolf.actions.seer_actions import Verify
from metagpt.ext.werewolf.actions.witch_actions import Save, Poison
from metagpt.ext.werewolf.actions.common_actions import Speak, NighttimeWhispers, Reflect
from metagpt.ext.werewolf.actions.experience_operation import AddNewExperiences, RetrieveExperiences
from metagpt.ext.werewolf.actions.moderator_actions import InstructSpeak
ACTIONS = {
"Speak": Speak,
"Hunt": Hunt,
"Protect": Protect,
"Verify": Verify,
"Save": Save,
"Poison": Poison,
"Impersonate": Impersonate,
}
__all__ = ["NighttimeWhispers", "Reflect", "AddNewExperiences", "RetrieveExperiences", "InstructSpeak"]

View file

@ -0,0 +1,240 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
import json
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.actions import Action
from metagpt.logs import logger
from metagpt.utils.common import parse_json_code_block
def log_and_parse_json(name: str, rsp: str) -> dict:
rsp = rsp.replace("\n", " ")
logger.debug(f"{name} result: {rsp}")
json_blocks = parse_json_code_block(rsp)
rsp_json = json.loads(json_blocks[0])
return rsp_json
class Speak(Action):
"""Action: Any speak action in a game"""
PROMPT_TEMPLATE: str = """
{
"BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night."
,"HISTORY": "You have knowledge to the following conversation: __context__"
,"ATTENTION": "You can NOT VOTE a player who is NOT ALIVE now!"
,"REFLECTION": "__reflection__"
,"STRATEGY": __strategy__
,"PAST_EXPERIENCES": "__experiences__"
,"MODERATOR_INSTRUCTION": __latest_instruction__,
,"RULE": "Please follow the moderator's latest instruction, figure out if you need to speak your opinion or directly to vote:
1. If the instruction is to SPEAK, speak in 200 words. Remember the goal of your role and try to achieve it using your speech;
2. If the instruction is to VOTE, you MUST vote and ONLY say 'I vote to eliminate PlayerX', replace PlayerX with the actual player name, DO NOT include any other words."
,"OUTPUT_FORMAT":
{
"ROLE": "Your role, in this case, __profile__"
,"PLAYER_NAME": "Your name, in this case, __name__"
,"LIVING_PLAYERS": "List living players based on MODERATOR_INSTRUCTION. Return a json LIST datatype."
,"THOUGHTS": "Based on `MODERATOR_INSTRUCTION` and `RULE`, carefully think about what to say or vote so that your chance of win as __profile__ maximizes.
If you find similar situation in `PAST_EXPERIENCES`, you may draw lessons from them to refine your strategy, take better vote action, or improve your speech.
Give your step-by-step thought process, you should think no more than 3 steps. For example: My step-by-step thought process:..."
,"RESPONSE": "Based on `MODERATOR_INSTRUCTION`, `RULE`, and the 'THOUGHTS' you had, express your opinion or cast a vote."
}
}
"""
STRATEGY: str = """
Decide whether to reveal your identity based on benefits vs. risks, provide useful information, and vote to eliminate the most suspicious.
If you have special abilities, pay attention to those who falsely claims your role, for they are probably werewolves.
"""
name: str = "Speak"
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def run(
self,
profile: str,
name: str,
context: str,
latest_instruction: str,
reflection: str = "",
experiences: str = "",
):
prompt = (
self.PROMPT_TEMPLATE.replace("__context__", context)
.replace("__profile__", profile)
.replace("__name__", name)
.replace("__latest_instruction__", latest_instruction)
.replace("__strategy__", self.STRATEGY)
.replace("__reflection__", reflection)
.replace("__experiences__", experiences)
)
rsp = await self._aask(prompt)
rsp_json = log_and_parse_json(self.name, rsp)
return rsp_json["RESPONSE"]
class NighttimeWhispers(Action):
"""
Action: nighttime whispers with thinking processes
Usage Example:
class Hunt(NighttimeWhispers):
def __init__(self, name="Hunt", context=None, llm=None):
super().__init__(name, context, llm)
class Protect(NighttimeWhispers):
def __init__(self, name="Protect", context=None, llm=None):
super().__init__(name, context, llm)
class Verify(NighttimeWhispers):
def __init__(self, name="Verify", context=None, llm=None):
super().__init__(name, context, llm)
class Save(NighttimeWhispers):
def __init__(self, name="Save", context=None, llm=None):
super().__init__(name, context, llm)
def _update_prompt_json(self, prompt_json: dict, profile: str, name: str, context: str, **kwargs):
del prompt_json['ACTION']
del prompt_json['ATTENTION']
prompt_json["OUTPUT_FORMAT"]["THOUGHTS"] = "It is night time. Return the thinking steps of your decision of whether to save the player JUST be killed at this night."
prompt_json["OUTPUT_FORMAT"]["RESPONSE"] = "Follow the Moderator's instruction, decide whether you want to save that person or not. Return SAVE or PASS."
return prompt_json
class Poison(NighttimeWhispers):
def __init__(self, name="Poison", context=None, llm=None):
super().__init__(name, context, llm)
def _update_prompt_json(self, prompt_json: dict, profile: str, name: str, context: str, **kwargs):
prompt_json["OUTPUT_FORMAT"]["RESPONSE"] += "Or if you want to PASS, return PASS."
return prompt_json
"""
PROMPT_TEMPLATE: str = """
{
"BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night."
,"HISTORY": "You have knowledge to the following conversation: __context__"
,"ACTION": "Choose one living player to __action__."
,"ATTENTION": "1. You can only __action__ a player who is alive this night! And you can not __action__ a player who is dead this night! 2. `HISTORY` is all the information you observed, DONT hallucinate other player actions!"
,"REFLECTION": "__reflection__"
,"STRATEGY": "__strategy__"
,"PAST_EXPERIENCES": "__experiences__"
,"OUTPUT_FORMAT":
{
"ROLE": "Your role, in this case, __profile__"
,"PLAYER_NAME": "Your name, in this case, __name__"
,"LIVING_PLAYERS": "List the players who is alive based on moderator's latest instruction. Return a json LIST datatype."
,"THOUGHTS": "Choose one living player from `LIVING_PLAYERS` to __action__ this night. Return the reason why you choose to __action__ this player. If you observe nothing at first night, DONT imagine unexisting player actions! If you find similar situation in `PAST_EXPERIENCES`, you may draw lessons from them to refine your strategy and take better actions. Give your step-by-step thought process, you should think no more than 3 steps. For example: My step-by-step thought process:..."
,"RESPONSE": "As a __profile__, you should choose one living player from `LIVING_PLAYERS` to __action__ this night according to the THOUGHTS you have just now. Return the player name ONLY."
}
}
"""
STRATEGY: str = """
Decide which player is most threatening to you or most needs your support, take your action correspondingly.
"""
name: str = "NightTimeWhispers"
def _construct_prompt_json(
self, role_profile: str, role_name: str, context: str, reflection: str, experiences: str, **kwargs
):
prompt_template = self.PROMPT_TEMPLATE
def replace_string(prompt_json: dict):
k: str
for k in prompt_json.keys():
if isinstance(prompt_json[k], dict):
prompt_json[k] = replace_string(prompt_json[k])
continue
prompt_json[k] = prompt_json[k].replace("__profile__", role_profile)
prompt_json[k] = prompt_json[k].replace("__name__", role_name)
prompt_json[k] = prompt_json[k].replace("__context__", context)
prompt_json[k] = prompt_json[k].replace("__action__", self.name)
prompt_json[k] = prompt_json[k].replace("__strategy__", self.STRATEGY)
prompt_json[k] = prompt_json[k].replace("__reflection__", reflection)
prompt_json[k] = prompt_json[k].replace("__experiences__", experiences)
return prompt_json
prompt_json: dict = json.loads(prompt_template)
prompt_json = replace_string(prompt_json)
prompt_json: dict = self._update_prompt_json(
prompt_json, role_profile, role_name, context, reflection, experiences, **kwargs
)
assert isinstance(prompt_json, dict)
prompt: str = json.dumps(prompt_json, indent=4, ensure_ascii=False)
return prompt
def _update_prompt_json(
self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str
) -> dict:
# one can modify the prompt_json dictionary here
return prompt_json
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def run(self, context: str, profile: str, name: str, reflection: str = "", experiences: str = ""):
prompt = self._construct_prompt_json(
role_profile=profile, role_name=name, context=context, reflection=reflection, experiences=experiences
)
rsp = await self._aask(prompt)
rsp_json = log_and_parse_json(self.name, rsp)
return f"{self.name} " + rsp_json["RESPONSE"]
class Reflect(Action):
PROMPT_TEMPLATE: str = """
{
"BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night."
,"HISTORY": "You have knowledge to the following conversation: __context__"
,"MODERATOR_INSTRUCTION": __latest_instruction__,
,"OUTPUT_FORMAT" (a json):
{
"ROLE": "Your role, in this case, __profile__"
,"PLAYER_NAME": "Your name, in this case, __name__"
"GAME_STATES": "You are about to follow `MODERATOR_INSTRUCTION`, but before taking any action, analyze each player, including the living and the dead, and summarize the game states.
For each player, your reflection should be a ONE-LINE json covering the following dimension, return a LIST of jsons (return an empty LIST for the first night):
[
{"TARGET": "the player you will analyze, if the player is yourself or your werewolf partner, indicate it" ,"STATUS": "living or dead, if dead, how was he/she possibly killed?", "CLAIMED_ROLE": "claims a role or not, if so, what role, any contradiction to others? If there is no claim, return 'None'", "SIDE_WITH": "sides with which players? If none, return 'None'", "ACCUSE": "accuses which players? If none, return 'None'"}
,{...}
,...
]"
,"REFLECTION": "Based on the whole `GAME_STATES`, return a json (return an empty string for the first night):
{
"Player1": "the true role (werewolf / special role / villager, living or dead) you infer about him/her, and why is this role? If the player is yourself or your werewolf partner, indicate it."
,...
,"Player7": "the true role (werewolf / special role / villager, living or dead) you infer about him/her, and why is this role? If the player is yourself or your werewolf partner, indicate it."
,"GAME_STATE_SUMMARIZATION": "summarize the current situation from your standpoint in one sentence, your summarization should catch the most important information from your reflection, such as conflicts, number of living werewolves, special roles, and villagers."
}"
}
}
"""
name: str = "Reflect"
@retry(stop=stop_after_attempt(2), wait=wait_fixed(1))
async def run(self, profile: str, name: str, context: str, latest_instruction: str):
prompt = (
self.PROMPT_TEMPLATE.replace("__context__", context)
.replace("__profile__", profile)
.replace("__name__", name)
.replace("__latest_instruction__", latest_instruction)
)
rsp = await self._aask(prompt)
rsp_json = log_and_parse_json(self.name, rsp)
return json.dumps(rsp_json["REFLECTION"])

View file

@ -0,0 +1,162 @@
import json
from typing import Optional
import chromadb
from pydantic import model_validator
from metagpt.actions import Action
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.schema import RoleExperience
from metagpt.logs import logger
from metagpt.rag.engines.simple import SimpleEngine
from metagpt.rag.schema import ChromaIndexConfig, ChromaRetrieverConfig
from metagpt.utils.common import read_json_file, write_json_file
DEFAULT_COLLECTION_NAME = "role_reflection" # FIXME: some hard code for now
PERSIST_PATH = DEFAULT_WORKSPACE_ROOT.joinpath("werewolf_game/chroma")
PERSIST_PATH.mkdir(parents=True, exist_ok=True)
class AddNewExperiences(Action):
name: str = "AddNewExperience"
collection_name: str = DEFAULT_COLLECTION_NAME
delete_existing: bool = False
engine: Optional[SimpleEngine] = None
@model_validator(mode="after")
def validate_collection(self):
if self.engine:
return
if self.delete_existing:
try:
# implement engine `DELETE` method later
chromadb.PersistentClient(PERSIST_PATH.as_posix()).delete_collection(self.collection_name)
except Exception as exp:
logger.error(f"delete chroma collection: {self.collection_name} failed, exp: {exp}")
self.engine = SimpleEngine.from_objs(
retriever_configs=[
ChromaRetrieverConfig(
persist_path=PERSIST_PATH, collection_name=self.collection_name, metadata={"hnsw:space": "cosine"}
)
]
)
def run(self, experiences: list[RoleExperience]):
if not experiences:
return
for i, exp in enumerate(experiences):
exp.id = f"{exp.profile}-{exp.name}-step{i}-round_{exp.round_id}"
AddNewExperiences._record_experiences_local(experiences)
self.engine.add_objs(experiences)
def add_from_file(self, file_path):
experiences = read_json_file(file_path)
experiences = [RoleExperience.model_validate(item) for item in experiences]
experiences = [exp for exp in experiences if len(exp.reflection) > 2] # not "" or not '""'
self.engine.add_objs(experiences)
@staticmethod
def _record_experiences_local(experiences: list[RoleExperience]):
round_id = experiences[0].round_id
version = experiences[0].version
version = "test" if not version else version
experiences = [exp.model_dump() for exp in experiences]
experience_path = DEFAULT_WORKSPACE_ROOT.joinpath(f"werewolf_game/experiences/{version}")
experience_path.mkdir(parents=True, exist_ok=True)
save_path = f"{experience_path}/{round_id}.json"
write_json_file(save_path, experiences)
logger.info(f"experiences saved to {save_path}")
class RetrieveExperiences(Action):
name: str = "RetrieveExperiences"
collection_name: str = DEFAULT_COLLECTION_NAME
has_experiences: bool = True
engine: Optional[SimpleEngine] = None
topk: int = 10
@model_validator(mode="after")
def validate_collection(self):
if self.engine:
return
try:
self.engine = SimpleEngine.from_index(
index_config=ChromaIndexConfig(
persist_path=PERSIST_PATH, collection_name=self.collection_name, metadata={"hnsw:space": "cosine"}
),
retriever_configs=[
ChromaRetrieverConfig(
similarity_top_k=self.topk,
persist_path=PERSIST_PATH,
collection_name=self.collection_name,
metadata={"hnsw:space": "cosine"},
)
],
)
except Exception as exp:
logger.warning(f"No experience pool: {self.collection_name}, exp: {exp}")
def run(self, query: str, profile: str, excluded_version: str = "", verbose: bool = False) -> str:
"""_summary_
Args:
query (str): 用当前的reflection作为query去检索过去相似的reflection
profile (str): _description_
Returns:
_type_: _description_
"""
if not self.engine or len(query) <= 2: # not "" or not '""'
logger.warning("engine is None or query too short")
return ""
# ablation experiment logic
if profile == RoleType.WEREWOLF.value: # role werewolf as baseline, don't use experiences
logger.warning("Disable werewolves' experiences")
return ""
results = self.engine.retrieve(query)
logger.info(f"retrieve {profile}'s experiences")
experiences = [res.metadata["obj"] for res in results]
past_experiences = [] # currently use post-process to filter, and later add `filters` in rag
for exp in experiences:
if exp.profile == profile and exp.version != excluded_version:
past_experiences.append(exp)
if verbose and results:
logger.info("past_experiences: {}".format("\n\n".join(past_experiences)))
distances = results[0].score
logger.info(f"distances: {distances}")
template = """
{
"Situation __i__": "__situation__"
,"Moderator's instruction": "__instruction__"
,"Your action or speech during that time": "__response__"
,"Reality": "In fact, it turned out the true roles are __game_step__",
,"Outcome": "You __outcome__ in the end"
}
"""
past_experiences = [
(
template.replace("__i__", str(i))
.replace("__situation__", exp.reflection)
.replace("__instruction__", exp.instruction)
.replace("__response__", exp.response)
.replace("__game_step__", exp.game_setup.replace("0 | Game setup:\n", "").replace("\n", " "))
.replace("__outcome__", exp.outcome)
)
for i, exp in enumerate(past_experiences)
]
logger.info("past_experiences: {}".format("\n".join(past_experiences)))
logger.info("retrieval done")
return json.dumps(past_experiences)

View file

@ -0,0 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers
class Protect(NighttimeWhispers):
name: str = "Protect"

View file

@ -0,0 +1,39 @@
from metagpt.actions import Action
from metagpt.environment.werewolf.const import STEP_INSTRUCTIONS
class InstructSpeak(Action):
name: str = "InstructSpeak"
async def run(self, step_idx, living_players, werewolf_players, player_hunted, player_current_dead):
instruction_info = STEP_INSTRUCTIONS.get(
step_idx, {"content": "Unknown instruction.", "send_to": {}, "restricted_to": {}}
)
content = instruction_info["content"]
if "{living_players}" in content and "{werewolf_players}" in content:
content = content.format(
living_players=living_players, werewolf_players=werewolf_players, werewolf_num=len(werewolf_players)
)
if "{living_players}" in content:
content = content.format(living_players=living_players)
if "{werewolf_players}" in content:
content = content.format(werewolf_players=werewolf_players)
if "{player_hunted}" in content:
content = content.format(player_hunted=player_hunted)
if "{player_current_dead}" in content:
player_current_dead = "No one" if not player_current_dead else player_current_dead
content = content.format(player_current_dead=player_current_dead)
return content, instruction_info["send_to"], instruction_info["restricted_to"]
class ParseSpeak(Action):
name: str = "ParseSpeak"
async def run(self):
pass
class AnnounceGameResult(Action):
async def run(self, winner: str, win_reason: str):
return f"Game over! {win_reason}. The winner is the {winner}"

View file

@ -0,0 +1,5 @@
from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers
class Verify(NighttimeWhispers):
name: str = "Verify"

View file

@ -0,0 +1,17 @@
from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers, Speak
class Hunt(NighttimeWhispers):
name: str = "Hunt"
class Impersonate(Speak):
"""Action: werewolf impersonating a good guy in daytime speak"""
STRATEGY: str = """
Try continuously impersonating a role, such as Seer, Guard, Villager, etc., in order to mislead
other players, make them trust you, and thus hiding your werewolf identity. However, pay attention to what your werewolf partner said,
DONT claim the same role as your werewolf partner. Remmber NOT to reveal your real identity as a werewolf!
"""
name: str = "Impersonate"

View file

@ -0,0 +1,47 @@
from metagpt.environment.werewolf.const import RoleActionRes
from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers
class Save(NighttimeWhispers):
name: str = "Save"
def _update_prompt_json(
self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str
) -> dict:
del prompt_json["ACTION"]
del prompt_json["ATTENTION"]
prompt_json["OUTPUT_FORMAT"][
"THOUGHTS"
] = "It is night time. Return the thinking steps of your decision of whether to save the player JUST killed this night."
prompt_json["OUTPUT_FORMAT"][
"RESPONSE"
] = "Follow the Moderator's instruction, decide whether you want to save that person or not. Return SAVE or PASS."
return prompt_json
async def run(self, *args, **kwargs):
rsp = await super().run(*args, **kwargs)
action_name, rsp = rsp.split()
return rsp # 只需回复SAVE或PASS不需要带上action名
class Poison(NighttimeWhispers):
STRATEGY: str = """
Only poison a player if you are confident he/she is a werewolf. Don't poison a player randomly or at first night.
If someone claims to be the witch, poison him/her, because you are the only witch, he/she can only be a werewolf.
"""
name: str = "Poison"
def _update_prompt_json(
self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str
) -> dict:
prompt_json["OUTPUT_FORMAT"]["RESPONSE"] += "Or if you want to PASS, return PASS."
return prompt_json
async def run(self, *args, **kwargs):
rsp = await super().run(*args, **kwargs)
if RoleActionRes.PASS.value in rsp.lower():
action_name, rsp = rsp.split() # 带PASS只需回复PASS不需要带上action名否则是Poison PlayerX无需改动
return rsp

View file

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from metagpt.ext.werewolf.roles.base_player import BasePlayer
from metagpt.ext.werewolf.roles.guard import Guard
from metagpt.ext.werewolf.roles.seer import Seer
from metagpt.ext.werewolf.roles.villager import Villager
from metagpt.ext.werewolf.roles.werewolf import Werewolf
from metagpt.ext.werewolf.roles.witch import Witch
from metagpt.ext.werewolf.roles.moderator import Moderator
__all__ = ["BasePlayer", "Guard", "Moderator", "Seer", "Villager", "Witch", "Werewolf"]

View file

@ -0,0 +1,176 @@
import re
from pydantic import Field, SerializeAsAny, model_validator
from metagpt.actions.action import Action
from metagpt.environment.werewolf.const import RoleState, RoleType
from metagpt.ext.werewolf.actions import (
ACTIONS,
AddNewExperiences,
InstructSpeak,
NighttimeWhispers,
Reflect,
RetrieveExperiences,
Speak,
)
from metagpt.ext.werewolf.schema import RoleExperience, WwMessage
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.utils.common import any_to_str
class BasePlayer(Role):
name: str = "PlayerXYZ"
profile: str = "BasePlayer"
special_action_names: list[str] = []
use_reflection: bool = True
use_experience: bool = False
use_memory_selection: bool = False
new_experience_version: str = ""
status: RoleState = RoleState.ALIVE
special_actions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)
experiences: list[RoleExperience] = []
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 技能和监听配置
self._watch([InstructSpeak]) # 监听Moderator的指令以做行动
special_actions = [ACTIONS[action_name] for action_name in self.special_action_names]
capable_actions = [Speak] + special_actions
self.set_actions(capable_actions) # 给角色赋予行动技能
self.special_actions = special_actions
if not self.use_reflection and self.use_experience:
logger.warning("You must enable use_reflection before using experience")
self.use_experience = False
@model_validator(mode="after")
def check_addresses(self):
if not self.addresses:
self.addresses = {any_to_str(self), self.name, self.profile} if self.name else {any_to_str(self)}
return self
async def _observe(self, ignore_memory=False) -> int:
if self.status != RoleState.ALIVE:
# 死者不再参与游戏
return 0
news = []
if not news:
news = self.rc.msg_buffer.pop_all()
old_messages = [] if ignore_memory else self.rc.memory.get()
for m in news:
if len(m.restricted_to) and self.profile not in m.restricted_to and self.name not in m.restricted_to:
# if the msg is not send to the whole audience ("") nor this role (self.profile or self.name),
# then this role should not be able to receive it and record it into its memory
continue
self.rc.memory.add(m)
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.profile in n.send_to) and n not in old_messages
]
# TODO to delete
# await super()._observe()
# # 只有发给全体的(""或发给自己的self.profile消息需要走下面的_react流程
# # 其他的收听到即可,不用做动作
# self.rc.news = [msg for msg in self.rc.news if msg.send_to in ["", self.profile]]
return len(self.rc.news)
async def _think(self):
news = self.rc.news[0]
assert news.cause_by == any_to_str(InstructSpeak) # 消息为来自Moderator的指令时才去做动作
if not news.restricted_to:
# 消息接收范围为全体角色的,做公开发言(发表投票观点也算发言)
self.rc.todo = Speak()
elif self.profile in news.restricted_to:
# FIXME: hard code to split, restricted为"Moderator"或"Moderator, 角色profile"
# Moderator加密发给自己的意味着要执行角色的特殊动作
self.rc.todo = self.special_actions[0]()
async def _act(self):
# todo为_think时确定的有两种情况Speak或Protect
todo = self.rc.todo
logger.info(f"{self._setting}: ready to {str(todo)}")
# 可以用这个函数获取该角色的全部记忆和最新的instruction
memories = self.get_all_memories()
latest_instruction = self.get_latest_instruction()
reflection = (
await Reflect().run(
profile=self.profile, name=self.name, context=memories, latest_instruction=latest_instruction
)
if self.use_reflection
else ""
)
experiences = (
RetrieveExperiences().run(
query=reflection, profile=self.profile, excluded_version=self.new_experience_version
)
if self.use_experience
else ""
)
# 根据自己定义的角色Action对应地去runrun的入参可能不同
if isinstance(todo, Speak):
rsp = await todo.run(
profile=self.profile,
name=self.name,
context=memories,
latest_instruction=latest_instruction,
reflection=reflection,
experiences=experiences,
)
restricted_to = set()
elif isinstance(todo, NighttimeWhispers):
rsp = await todo.run(
profile=self.profile, name=self.name, context=memories, reflection=reflection, experiences=experiences
)
restricted_to = {RoleType.MODERATOR.value, self.profile} # 给Moderator发送使用特殊技能的加密消息
msg = WwMessage(
content=rsp,
role=self.profile,
sent_from=self.name,
cause_by=type(todo),
send_to={},
restricted_to=restricted_to,
)
self.experiences.append(
RoleExperience(
name=self.name,
profile=self.profile,
reflection=reflection,
instruction=latest_instruction,
response=rsp,
version=self.new_experience_version,
)
)
logger.info(f"{self._setting}: {rsp}")
return msg
def get_all_memories(self) -> str:
memories = self.rc.memory.get()
time_stamp_pattern = r"[0-9]+ \| "
# NOTE: 除Moderator外其他角色使用memory只能用m.sent_from玩家名不能用m.role玩家角色因为他们不知道说话者的身份
memories = [f"{m.sent_from}: {re.sub(time_stamp_pattern, '', m.content)}" for m in memories] # regex去掉时间戳
memories = "\n".join(memories)
return memories
def get_latest_instruction(self) -> str:
return self.rc.important_memory[-1].content # 角色监听着Moderator的InstructSpeak是其重要记忆直接获取即可
def set_status(self, new_status: RoleState):
self.status = new_status
def record_experiences(self, round_id: str, outcome: str, game_setup: str):
experiences = [exp for exp in self.experiences if len(exp.reflection) > 2] # not "" or not '""'
for exp in experiences:
exp.round_id = round_id
exp.outcome = outcome
exp.game_setup = game_setup
AddNewExperiences().run(experiences)

View file

@ -0,0 +1,8 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.roles.base_player import BasePlayer
class Guard(BasePlayer):
name: str = RoleType.GUARD.value
profile: str = RoleType.GUARD.value
special_action_names: list[str] = ["Protect"]

View file

@ -0,0 +1,45 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.actions import Speak
from metagpt.ext.werewolf.roles import BasePlayer
from metagpt.ext.werewolf.schema import WwMessage
from metagpt.logs import logger
async def _act(self):
todo = self.rc.todo
memories = self.get_all_memories()
input_instruction = f"""
## As a reminder, you have access to the following game history:
{memories}
## You are {self.name}({self.profile})
## Guidance:
1. If you are performing a special action or exercising a vote,
end your response with "PlayerX", replace PlayerX with the actual player name, e.g., "..., kill/protect/poison/.../vote Player1".
2. If it is a daytime free speech, you can speak in whatever format.
Now, please speak:
"""
rsp = input(input_instruction) # wait for human input
msg_cause_by = type(todo)
msg_restricted_to = {} if isinstance(todo, Speak) else {RoleType.MODERATOR.value, self.profile}
msg = WwMessage(
content=rsp,
role=self.profile,
sent_from=self.name,
cause_by=msg_cause_by,
send_to={},
restricted_to=msg_restricted_to, # 给Moderator及自身阵营发送加密消息
)
logger.info(f"{self._setting}: {rsp}")
return msg
def prepare_human_player(player_class: BasePlayer):
# Dynamically define a human player class that inherits from a certain role class
HumanPlayer = type("HumanPlayer", (player_class,), {"_act": _act})
return HumanPlayer

View file

@ -0,0 +1,251 @@
import re
from datetime import datetime
from typing import Union
from metagpt.actions.add_requirement import UserRequirement
from metagpt.const import DEFAULT_WORKSPACE_ROOT, MESSAGE_ROUTE_TO_ALL
from metagpt.environment.werewolf.const import (
STEP_INSTRUCTIONS,
RoleActionRes,
RoleState,
RoleType,
)
from metagpt.environment.werewolf.env_space import EnvAction, EnvActionType
from metagpt.ext.werewolf.actions import Hunt, Poison, Protect, Save, Verify
from metagpt.ext.werewolf.actions.moderator_actions import (
AnnounceGameResult,
InstructSpeak,
ParseSpeak,
)
from metagpt.ext.werewolf.roles.base_player import BasePlayer
from metagpt.ext.werewolf.schema import WwMessage
from metagpt.logs import logger
from metagpt.utils.common import any_to_str
class Moderator(BasePlayer):
name: str = RoleType.MODERATOR.value
profile: str = RoleType.MODERATOR.value
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._watch([UserRequirement, InstructSpeak, ParseSpeak])
self.set_actions([InstructSpeak, ParseSpeak, AnnounceGameResult])
# game states
self.step_idx = 0
self.game_setup = ""
self.werewolf_players = []
self.winner = None
self.win_reason = None
self.witch_poison_left = 1
self.witch_antidote_left = 1
def update_player_status(self, player_names: list[str]):
if not player_names:
return
roles_in_env = self.rc.env.get_roles()
for role_setting, role in roles_in_env.items():
for player_name in player_names:
if player_name in role_setting:
role.set_status(new_status=RoleState.DEAD) # 更新为死亡
def _record_all_experiences(self):
logger.info(f"The winner of the game: {self.winner}, start to record roles' experiences")
roles_in_env = self.rc.env.get_roles()
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
for _, role in roles_in_env.items():
if role == self:
continue
if self.winner == "werewolf":
outcome = "won" if role.profile in RoleType.WEREWOLF.value else "lost"
else:
outcome = "won" if role.profile not in RoleType.WEREWOLF.value else "lost"
role.record_experiences(round_id=timestamp, outcome=outcome, game_setup=self.game_setup)
async def _parse_speak(self, memories):
latest_msg = memories[-1]
latest_msg_content = latest_msg.content
match = re.search(r"Player[0-9]+", latest_msg_content[-10:]) # FIXME: hard code truncation
target = match.group(0) if match else ""
# default return
msg_content = "Understood"
restricted_to = set()
msg_cause_by = latest_msg.cause_by
if msg_cause_by == any_to_str(Hunt):
self.rc.env.step(
EnvAction(
action_type=EnvActionType.WOLF_KILL, player_name=latest_msg.sent_from, target_player_name=target
)
)
elif msg_cause_by == any_to_str(Protect):
self.rc.env.step(
EnvAction(
action_type=EnvActionType.GUARD_PROTECT, player_name=latest_msg.sent_from, target_player_name=target
)
)
elif msg_cause_by == any_to_str(Verify):
if target in self.werewolf_players:
msg_content = f"{target} is a werewolf"
else:
msg_content = f"{target} is a good guy"
restricted_to = {RoleType.MODERATOR.value, RoleType.SEER.value}
elif msg_cause_by == any_to_str(Save):
if RoleActionRes.PASS.value in latest_msg_content.lower():
# the role ignore to response, answer `pass`
pass
elif not self.witch_antidote_left:
msg_content = "You have no antidote left and thus can not save the player"
restricted_to = {RoleType.MODERATOR.value, RoleType.WITCH.value}
else:
self.rc.env.step(
EnvAction(
action_type=EnvActionType.WITCH_SAVE,
player_name=latest_msg.sent_from,
target_player_name=target,
)
)
elif msg_cause_by == any_to_str(Poison):
if RoleActionRes.PASS.value in latest_msg_content.lower():
pass
elif not self.witch_poison_left:
msg_content = "You have no poison left and thus can not poison the player"
restricted_to = {RoleType.MODERATOR.value, RoleType.WITCH.value}
else:
self.rc.env.step(
EnvAction(
action_type=EnvActionType.WITCH_POISON,
player_name=latest_msg.sent_from,
target_player_name=target,
)
)
return msg_content, restricted_to
def _update_player_status(self, step_idx: int, player_current_dead: list[str]):
"""update dead player's status"""
if step_idx in [15, 18]:
self.update_player_status(player_current_dead)
def _record_game_history(self, step_idx: int):
if step_idx and step_idx % len(STEP_INSTRUCTIONS) == 0 or self.winner:
logger.info("a night and day cycle completed, examine all history")
logger.debug(f"all_memories: {self.get_all_memories()}")
with open(DEFAULT_WORKSPACE_ROOT / "werewolf_transcript.txt", "w") as f:
f.write(self.get_all_memories())
async def _observe(self, ignore_memory=False) -> int:
news = []
if not news:
news = self.rc.msg_buffer.pop_all()
old_messages = [] if ignore_memory else self.rc.memory.get()
for m in news:
if len(m.restricted_to) and self.profile not in m.restricted_to and self.name not in m.restricted_to:
# if the msg is not send to the whole audience ("") nor this role (self.profile or self.name),
# then this role should not be able to receive it and record it into its memory
continue
self.rc.memory.add(m)
# add `MESSAGE_ROUTE_TO_ALL in n.send_to` make it to run `ParseSpeak`
self.rc.news = [
n
for n in news
if (n.cause_by in self.rc.watch or self.profile in n.send_to or MESSAGE_ROUTE_TO_ALL in n.send_to)
and n not in old_messages
]
return len(self.rc.news)
async def _think(self):
if self.winner:
self.rc.todo = AnnounceGameResult()
return
latest_msg = self.rc.memory.get()[-1]
if latest_msg.role in ["User", "Human", self.profile]:
# 1. 上一轮消息是用户指令,解析用户指令,开始游戏
# 2.1. 上一轮消息是Moderator自己的指令继续发出指令一个事情可以分几条消息来说
# 2.2. 上一轮消息是Moderator自己的解析消息一个阶段结束发出新一个阶段的指令
self.rc.todo = InstructSpeak()
else:
# 上一轮消息是游戏角色的发言,解析角色的发言
self.rc.todo = ParseSpeak()
def _init_fields_from_obj(self, obs: dict[str, Union[int, str, list[str]]]):
self.game_setup = obs.get("game_setup", "")
self.step_idx = obs.get("step_idx", 0)
self.winner = obs.get("winner")
self.win_reason = obs.get("win_reason")
self.werewolf_players = obs.get("werewolf_players", [])
self.witch_poison_left = obs.get("witch_poison_left", 0)
self.witch_antidote_left = obs.get("witch_antidote_left", 0)
async def _act(self):
todo = self.rc.todo
logger.info(f"{self._setting} ready to {todo}")
memories = self.get_all_memories(mode="msg")
obs, _, _, _, _ = self.rc.env.step(action=EnvAction(action_type=EnvActionType.NONE))
living_players = obs["living_players"]
werewolf_players = obs["werewolf_players"]
player_hunted = obs["player_hunted"]
player_current_dead = obs["player_current_dead"]
self._init_fields_from_obj(obs)
# 若进行完一夜一日的循环,打印和记录一次完整发言历史
self._record_game_history(self.step_idx)
# 若一晚或一日周期结束,对当晚或当日的死者进行总结,并更新玩家状态
self._update_player_status(self.step_idx, player_current_dead)
if self.winner:
self._record_all_experiences()
# 根据_think的结果执行InstructSpeak还是ParseSpeak, 并将结果返回
if isinstance(todo, InstructSpeak):
msg_content, msg_to_send_to, msg_restricted_to = await InstructSpeak().run(
self.step_idx,
living_players=living_players,
werewolf_players=werewolf_players,
player_hunted=player_hunted,
player_current_dead=player_current_dead,
)
# msg_content = f"Step {self.step_idx}: {msg_content}" # HACK: 加一个unique的step_idx避免记忆的自动去重
msg = WwMessage(
content=msg_content,
role=self.profile,
sent_from=self.name,
cause_by=InstructSpeak,
send_to=msg_to_send_to,
restricted_to=msg_restricted_to,
)
logger.info(f"current step_idx: {self.step_idx}")
self.rc.env.step(EnvAction(action_type=EnvActionType.PROGRESS_STEP)) # to update step_idx
elif isinstance(todo, ParseSpeak):
msg_content, msg_restricted_to = await self._parse_speak(memories)
# msg_content = f"Step {self.step_idx}: {msg_content}" # HACK: 加一个unique的step_idx避免记忆的自动去重
msg = WwMessage(
content=msg_content,
role=self.profile,
sent_from=self.name,
cause_by=ParseSpeak,
send_to={},
restricted_to=msg_restricted_to,
)
elif isinstance(todo, AnnounceGameResult):
msg_content = await AnnounceGameResult().run(winner=self.winner, win_reason=self.win_reason)
msg = WwMessage(content=msg_content, role=self.profile, sent_from=self.name, cause_by=AnnounceGameResult)
logger.info(f"{self._setting}: {msg_content}")
return msg
def get_all_memories(self, mode="str") -> str:
memories = self.rc.memory.get()
if mode == "str":
memories = [f"{m.sent_from}({m.role}): {m.content}" for m in memories]
memories = "\n".join(memories)
return memories

View file

@ -0,0 +1,8 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.roles.base_player import BasePlayer
class Seer(BasePlayer):
name: str = RoleType.SEER.value
profile: str = RoleType.SEER.value
special_action_names: list[str] = ["Verify"]

View file

@ -0,0 +1,8 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.roles.base_player import BasePlayer
class Villager(BasePlayer):
name: str = RoleType.VILLAGER.value
profile: str = RoleType.VILLAGER.value
special_action_names: list[str] = []

View file

@ -0,0 +1,15 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.actions import Impersonate, Speak
from metagpt.ext.werewolf.roles.base_player import BasePlayer
class Werewolf(BasePlayer):
name: str = RoleType.WEREWOLF.value
profile: str = RoleType.WEREWOLF.value
special_action_names: list[str] = ["Hunt"]
async def _think(self):
"""狼人白天发言时需要伪装与其他角色不同因此需要重写_think"""
await super()._think()
if isinstance(self.rc.todo, Speak):
self.rc.todo = Impersonate()

View file

@ -0,0 +1,28 @@
from metagpt.environment.werewolf.const import RoleType
from metagpt.ext.werewolf.actions import InstructSpeak, Poison, Save, Speak
from metagpt.ext.werewolf.roles.base_player import BasePlayer
from metagpt.utils.common import any_to_str
class Witch(BasePlayer):
name: str = RoleType.WITCH.value
profile: str = RoleType.WITCH.value
special_action_names: list[str] = ["Save", "Poison"]
async def _think(self):
"""女巫涉及两个特殊技能因此在此需要改写_think进行路由"""
news = self.rc.news[0]
assert news.cause_by == any_to_str(InstructSpeak) # 消息为来自Moderator的指令时才去做动作
if not news.restricted_to:
# 消息接收范围为全体角色的,做公开发言(发表投票观点也算发言)
self.rc.todo = Speak()
elif self.profile in news.restricted_to:
# FIXME: hard code to split, restricted为"Moderator"或"Moderator,角色profile"
# Moderator加密发给自己的意味着要执行角色的特殊动作
# 这里用关键词进行动作的选择需要Moderator侧的指令进行配合
if "save" in news.content.lower():
self.rc.todo = Save()
elif "poison" in news.content.lower():
self.rc.todo = Poison()
else:
raise ValueError("Moderator's instructions must include save or poison keyword")

View file

@ -0,0 +1,33 @@
from typing import Any
from pydantic import BaseModel, Field, field_validator
from metagpt.schema import Message
from metagpt.utils.common import any_to_str_set
class RoleExperience(BaseModel):
id: str = ""
name: str = ""
profile: str
reflection: str
instruction: str = ""
response: str
outcome: str = ""
round_id: str = ""
game_setup: str = ""
version: str = ""
def rag_key(self) -> str:
"""For search"""
return self.reflection
class WwMessage(Message):
# Werewolf Message
restricted_to: set[str] = Field(default=set(), validate_default=True)
@field_validator("restricted_to", mode="before")
@classmethod
def check_restricted_to(cls, restricted_to: Any):
return any_to_str_set(restricted_to if restricted_to else set())

View file

@ -0,0 +1,28 @@
from typing import Any, Optional
from metagpt.actions.add_requirement import UserRequirement
from metagpt.context import Context
from metagpt.environment.werewolf.werewolf_env import WerewolfEnv
from metagpt.ext.werewolf.schema import WwMessage
from metagpt.team import Team
class WerewolfGame(Team):
"""Use the "software company paradigm" to hold a werewolf game"""
env: Optional[WerewolfEnv] = None
def __init__(self, context: Context = None, **data: Any):
super(Team, self).__init__(**data)
ctx = context or Context()
if not self.env:
self.env = WerewolfEnv(context=ctx)
else:
self.env.context = ctx # The `env` object is allocated by deserialization
def run_project(self, idea):
"""Run a project from user instruction."""
self.idea = idea
self.env.publish_message(
WwMessage(role="User", content=idea, cause_by=UserRequirement, restricted_to={"Moderator"})
)

View file

@ -161,6 +161,13 @@ class SimpleEngine(RetrieverQueryEngine):
"""Inplement tools.SearchInterface"""
return await self.aquery(content)
def retrieve(self, query: QueryType) -> list[NodeWithScore]:
query_bundle = QueryBundle(query) if isinstance(query, str) else query
nodes = super().retrieve(query_bundle)
self._try_reconstruct_obj(nodes)
return nodes
async def aretrieve(self, query: QueryType) -> list[NodeWithScore]:
"""Allow query to be str."""
query_bundle = QueryBundle(query) if isinstance(query, str) else query

View file

@ -48,7 +48,7 @@ class RAGIndexFactory(ConfigBasedFactory):
def _create_chroma(self, config: ChromaIndexConfig, **kwargs) -> VectorStoreIndex:
db = chromadb.PersistentClient(str(config.persist_path))
chroma_collection = db.get_or_create_collection(config.collection_name)
chroma_collection = db.get_or_create_collection(config.collection_name, metadata=config.metadata)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
return self._index_from_vector_store(vector_store=vector_store, config=config, **kwargs)

View file

@ -69,7 +69,7 @@ class RetrieverFactory(ConfigBasedFactory):
def _create_chroma_retriever(self, config: ChromaRetrieverConfig, **kwargs) -> ChromaRetriever:
db = chromadb.PersistentClient(path=str(config.persist_path))
chroma_collection = db.get_or_create_collection(config.collection_name)
chroma_collection = db.get_or_create_collection(config.collection_name, metadata=config.metadata)
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
config.index = self._build_index_from_vector_store(config, vector_store, **kwargs)

View file

@ -1,8 +1,9 @@
"""RAG schemas."""
from pathlib import Path
from typing import Any, ClassVar, Literal, Union
from typing import Any, ClassVar, Literal, Optional, Union
from chromadb.api.types import CollectionMetadata
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.indices.base import BaseIndex
from llama_index.core.schema import TextNode
@ -59,6 +60,9 @@ class ChromaRetrieverConfig(IndexRetrieverConfig):
persist_path: Union[str, Path] = Field(default="./chroma_db", description="The directory to save data.")
collection_name: str = Field(default="metagpt", description="The name of the collection.")
metadata: Optional[CollectionMetadata] = Field(
default=None, description="Optional metadata to associate with the collection"
)
class ElasticsearchStoreConfig(BaseModel):
@ -144,6 +148,9 @@ class ChromaIndexConfig(VectorIndexConfig):
"""Config for chroma-based index."""
collection_name: str = Field(default="metagpt", description="The name of the collection.")
metadata: Optional[CollectionMetadata] = Field(
default=None, description="Optional metadata to associate with the collection"
)
class BM25IndexConfig(BaseIndexConfig):

View file

@ -335,6 +335,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
self.llm.cost_manager = self.context.cost_manager
self.set_actions(self.actions) # reset actions to update llm and prefix
@property
def name(self):
"""Get the role name"""
return self._setting.name
def _get_prefix(self):
"""Get the role prefix"""
if self.desc:

View file

@ -722,7 +722,10 @@ def list_files(root: str | Path) -> List[Path]:
def parse_json_code_block(markdown_text: str) -> List[str]:
json_blocks = re.findall(r"```json(.*?)```", markdown_text, re.DOTALL)
json_blocks = (
re.findall(r"```json(.*?)```", markdown_text, re.DOTALL) if "```json" in markdown_text else [markdown_text]
)
return [v.strip() for v in json_blocks]

View file

@ -2,33 +2,34 @@
# -*- coding: utf-8 -*-
# @Desc : the unittest of WerewolfExtEnv
from metagpt.environment.werewolf.werewolf_ext_env import RoleState, WerewolfExtEnv
from metagpt.environment.werewolf.const import RoleState, RoleType
from metagpt.environment.werewolf.werewolf_ext_env import WerewolfExtEnv
from metagpt.roles.role import Role
class Werewolf(Role):
profile: str = "Werewolf"
profile: str = RoleType.WEREWOLF.value
class Villager(Role):
profile: str = "Villager"
profile: str = RoleType.VILLAGER.value
class Witch(Role):
profile: str = "Witch"
profile: str = RoleType.WITCH.value
class Guard(Role):
profile: str = "Guard"
profile: str = RoleType.GUARD.value
def test_werewolf_ext_env():
players_state = {
"Player0": ("Werewolf", RoleState.ALIVE),
"Player1": ("Werewolf", RoleState.ALIVE),
"Player2": ("Villager", RoleState.ALIVE),
"Player3": ("Witch", RoleState.ALIVE),
"Player4": ("Guard", RoleState.ALIVE),
"Player0": (RoleType.WEREWOLF.value, RoleState.ALIVE),
"Player1": (RoleType.WEREWOLF.value, RoleState.ALIVE),
"Player2": (RoleType.VILLAGER.value, RoleState.ALIVE),
"Player3": (RoleType.WITCH.value, RoleState.ALIVE),
"Player4": (RoleType.GUARD.value, RoleState.ALIVE),
}
ext_env = WerewolfExtEnv(players_state=players_state, step_idx=4, special_role_players=["Player3", "Player4"])
@ -41,9 +42,9 @@ def test_werewolf_ext_env():
assert "Werewolves, please open your eyes" in curr_instr["content"]
# current step_idx = 5
ext_env.wolf_kill_someone(wolf=Role(name="Player10"), player_name="Player4")
ext_env.wolf_kill_someone(wolf=Werewolf(name="Player0"), player_name="Player4")
ext_env.wolf_kill_someone(wolf=Werewolf(name="Player1"), player_name="Player4")
ext_env.wolf_kill_someone(wolf_name="Player10", player_name="Player4")
ext_env.wolf_kill_someone(wolf_name="Player0", player_name="Player4")
ext_env.wolf_kill_someone(wolf_name="Player1", player_name="Player4")
assert ext_env.player_hunted == "Player4"
assert len(ext_env.living_players) == 5 # hunted but can be saved by witch
@ -52,11 +53,11 @@ def test_werewolf_ext_env():
# current step_idx = 18
assert ext_env.step_idx == 18
ext_env.vote_kill_someone(voteer=Werewolf(name="Player0"), player_name="Player2")
ext_env.vote_kill_someone(voteer=Werewolf(name="Player1"), player_name="Player3")
ext_env.vote_kill_someone(voteer=Villager(name="Player2"), player_name="Player3")
ext_env.vote_kill_someone(voteer=Witch(name="Player3"), player_name="Player4")
ext_env.vote_kill_someone(voteer=Guard(name="Player4"), player_name="Player2")
ext_env.vote_kill_someone(voter_name="Player0", player_name="Player2")
ext_env.vote_kill_someone(voter_name="Player1", player_name="Player3")
ext_env.vote_kill_someone(voter_name="Player2", player_name="Player3")
ext_env.vote_kill_someone(voter_name="Player3", player_name="Player4")
ext_env.vote_kill_someone(voter_name="Player4", player_name="Player2")
assert ext_env.player_current_dead == "Player2"
assert len(ext_env.living_players) == 4

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,164 @@
import json
import pytest
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.ext.werewolf.actions import AddNewExperiences, RetrieveExperiences
from metagpt.ext.werewolf.schema import RoleExperience
from metagpt.logs import logger
class TestExperiencesOperation:
collection_name = "test"
test_round_id = "test_01"
version = "test"
samples_to_add = [
RoleExperience(
profile="Witch",
reflection="The game is intense with two players claiming to be the Witch and one claiming to be the Seer. "
"Player4's behavior is suspicious.",
response="",
outcome="",
round_id=test_round_id,
version=version,
),
RoleExperience(
profile="Witch",
reflection="The game is in a critical state with only three players left, "
"and I need to make a wise decision to save Player7 or not.",
response="",
outcome="",
round_id=test_round_id,
version=version,
),
RoleExperience(
profile="Seer",
reflection="Player1, who is a werewolf, falsely claimed to be a Seer, and Player6, who might be a Witch, "
"sided with him. I, as the real Seer, am under suspicion.",
response="",
outcome="",
round_id=test_round_id,
version=version,
),
RoleExperience(
profile="TestRole",
reflection="Some test reflection1",
response="",
outcome="",
round_id=test_round_id,
version=version + "_01-10",
),
RoleExperience(
profile="TestRole",
reflection="Some test reflection2",
response="",
outcome="",
round_id=test_round_id,
version=version + "_11-20",
),
RoleExperience(
profile="TestRole",
reflection="Some test reflection3",
response="",
outcome="",
round_id=test_round_id,
version=version + "_21-30",
),
]
@pytest.mark.asyncio
async def test_add(self):
saved_file = DEFAULT_WORKSPACE_ROOT.joinpath(
f"werewolf_game/experiences/{self.version}/{self.test_round_id}.json"
)
if saved_file.exists():
saved_file.unlink()
action = AddNewExperiences(collection_name=self.collection_name, delete_existing=True)
action.run(self.samples_to_add)
# test insertion
inserted = action.engine.retriever._index._vector_store._collection.get()
assert len(inserted["documents"]) == len(self.samples_to_add)
# test if we record the samples correctly to local file
# & test if we could recover a embedding db from the file
action = AddNewExperiences(collection_name=self.collection_name, delete_existing=True)
action.add_from_file(saved_file)
inserted = action.engine.retriever._index._vector_store._collection.get()
assert len(inserted["documents"]) == len(self.samples_to_add)
@pytest.mark.asyncio
async def test_retrieve(self):
action = RetrieveExperiences(collection_name=self.collection_name)
query = "one player claimed to be Seer and the other Witch"
results = action.run(query, profile="Witch")
results = json.loads(results)
assert len(results) == 2, "Witch should have 2 experiences"
assert "The game is intense with two players" in results[0]
@pytest.mark.asyncio
async def test_retrieve_filtering(self):
action = RetrieveExperiences(collection_name=self.collection_name)
query = "some test query"
profile = "TestRole"
excluded_version = ""
results = action.run(query, profile=profile, excluded_version=excluded_version)
results = json.loads(results)
assert len(results) == 3
excluded_version = self.version + "_21-30"
results = action.run(query, profile=profile, excluded_version=excluded_version)
results = json.loads(results)
assert len(results) == 2
class TestActualRetrieve:
collection_name = "role_reflection"
@pytest.mark.asyncio
async def test_check_experience_pool(self):
logger.info("check experience pool")
action = RetrieveExperiences(collection_name=self.collection_name)
if action.engine:
all_experiences = action.engine.retriever._index._vector_store._collection.get()
logger.info(f"{len(all_experiences['metadatas'])=}")
@pytest.mark.asyncio
async def test_retrieve_werewolf_experience(self):
action = RetrieveExperiences(collection_name=self.collection_name)
query = "there are conflicts"
logger.info(f"test retrieval with {query=}")
action.run(query, "Werewolf")
@pytest.mark.asyncio
async def test_retrieve_villager_experience(self):
action = RetrieveExperiences(collection_name=self.collection_name)
query = "there are conflicts"
logger.info(f"test retrieval with {query=}")
results = action.run(query, "Seer")
assert "conflict" not in results # 相似局面应该需要包含conflict关键词
@pytest.mark.asyncio
async def test_retrieve_villager_experience_filtering(self):
action = RetrieveExperiences(collection_name=self.collection_name)
query = "there are conflicts"
excluded_version = "01-10"
logger.info(f"test retrieval with {excluded_version=}")
results_01_10 = action.run(query, profile="Seer", excluded_version=excluded_version, verbose=True)
excluded_version = "11-20"
logger.info(f"test retrieval with {excluded_version=}")
results_11_20 = action.run(query, profile="Seer", excluded_version=excluded_version, verbose=True)
assert results_01_10 == results_11_20

View file

@ -29,7 +29,7 @@ def test_add_role(env: Environment):
name="Alice", profile="product manager", goal="create a new product", constraints="limited resources"
)
env.add_role(role)
assert env.get_role(role.profile) == role
assert env.get_role(str(role._setting)) == role
def test_get_roles(env: Environment):