diff --git a/examples/werewolf_game/evals/eval.py b/examples/werewolf_game/evals/eval.py new file mode 100644 index 000000000..3093d80f2 --- /dev/null +++ b/examples/werewolf_game/evals/eval.py @@ -0,0 +1,217 @@ +""" +Filename: MetaGPT/examples/werewolf_game/evals/eval.py +Created Date: Oct 18, 2023 +Updated Date: Oct 24, 2023 +Author: [Aria](https://github.com/ariafyy) +Info: eval the Voting Accuracy Rate of non_werewolves and Vote Difficulity +""" + +import glob +import os +import re +from pathlib import Path + +import numpy as np +import pandas as pd +from tqdm import tqdm +from utils import Utils + +from metagpt.const import DEFAULT_WORKSPACE_ROOT, METAGPT_ROOT + + +class Vote: + """Vote Evaluation""" + + def __init__(self): + self.OUT_PATH = DEFAULT_WORKSPACE_ROOT / "outputs" + os.makedirs(self.OUT_PATH, exist_ok=True) + self.SUB_FOLDER_LIST = ["01-10", "11-20", "21-30"] + + def _get_log_fileslist(self, IN_PATH) -> list[str]: + files_list = [] + for SUB_FOLDER in self.SUB_FOLDER_LIST: + files_list.extend(glob.glob(str(IN_PATH / SUB_FOLDER / "*.txt"))) + return files_list + + def extract_votes_from_logs(self, files_list: list): + for in_logfile in tqdm(files_list): + SUB_FOLDER = (Path(in_logfile).parent).stem + out_txtfile = self.OUT_PATH / "# {0}_{1}.txt".format(SUB_FOLDER, Path(in_logfile).stem) + Utils().pick_vote_log(in_logfile, out_txtfile) + votefiles_list = Utils().get_file_list(self.OUT_PATH) + return votefiles_list + + @staticmethod + def parse_vote_text2chunks(text: str): + """ + parse each game vote log into text chunks + + one chunk example: + ['Player1', 'Player2', 'Player3', 'Player5', 'Player6']. Say ONLY: I vote to eliminate ... + Player1(Witch): 49 | I vote to eliminate Player5 + Player2(Villager): 49 | I vote to eliminate Player5 + Player3(Villager): 49 | I vote to eliminate Player5 + Player5(Werewolf): 49 | I vote to eliminate Player6 + Player6(Seer): 49 | I vote to eliminate Player5 + """ + pattern = re.compile(r"""\[([^\]]+)\]. Say ONLY: I vote to eliminate ...""") + chunks = {} + chunk_id = 0 + last_end = 0 + for match in pattern.finditer(text): + start = match.start() + chunk = text[last_end:start] + chunks[f"vote_{chunk_id}"] = chunk.strip() + last_end = match.end() + chunk_id += 1 + final_chunk = text[last_end:].strip() + if final_chunk: + chunks[f"vote_{chunk_id}"] = final_chunk + return chunks + + def _vote_rate_players(self, text: str): + """ + # calculate the rate of goodteam vote werewolves + :example: + + input: + ['Player1', 'Player2', 'Player3', 'Player5', 'Player6']. Say ONLY: I vote to eliminate ... + Player1(Witch): 49 | I vote to eliminate Player5 + Player2(Villager): 49 | I vote to eliminate Player5 + Player3(Villager): 49 | I vote to eliminate Player5 + Player5(Werewolf): 49 | I vote to eliminate Player6 + Player6(Seer): 49 | I vote to eliminate Player5 + + output: + werewolves: ['Player5'] + non_werewolves: ['Player1', 'Player2', 'Player3', 'Player6'] + as you can see :Player2(Villager) and Player3(Villager) vote to eliminate Player5(Werewolf) + :return goodteam vote rateability: 100.00% + """ + pattern = re.compile(r"(\w+)\(([^\)]+)\): \d+ \| I vote to eliminate (\w+)") + # find all werewolves + werewolves = [] + for match in pattern.finditer(text): + if match.group(2) == "Werewolf": + werewolves.append(match.group(1)) + + # find all non_werewolves + non_werewolves = [] + for match in pattern.finditer(text): + if match.group(2) != "Werewolf": + non_werewolves.append(match.group(1)) + num_non_werewolves = len(non_werewolves) + + # count players other than werewolves made the correct votes + correct_votes = 0 + for match in pattern.finditer(text): + if match.group(2) != "Werewolf" and match.group(3) in werewolves: + correct_votes += 1 + + # cal the rateability of non_werewolves + rate = correct_votes / num_non_werewolves + good_vote_rate = round(rate, 2) + return {"good_vote_rate": good_vote_rate, "werewolves": werewolves, "non_werewolves": non_werewolves} + + def get_goodteam_vote_rate(self, text: str) -> float: + goodteam_vote_rate = self._vote_rate_players(text)["good_vote_rate"] + return goodteam_vote_rate + + def get_werewolves(self, text: str) -> list: + werewolves_list = self._vote_rate_players(text)["werewolves"] + return werewolves_list + + def get_non_werewolves(self, text: str) -> list: + non_werewolves_list = self._vote_rate_players(text)["non_werewolves"] + return non_werewolves_list + + def get_votewolf_difficulty(self, werewolves: list, non_werewolves: list) -> str: + num_living_wolfs = len(werewolves) + num_living_players = len(werewolves) + len(non_werewolves) + votewolf_difficulty = "_{0} / {1}".format(num_living_wolfs, num_living_players) + return votewolf_difficulty + + def get_result_df(self, out_txtfile: str) -> pd.DataFrame: + """ + folder: sub folders for evals + file: evaluation file, each file represents one game + votes: the number of votes, eg. vote_1 represents the first vote of this game, + good_vote_rate:the rateability of a good person voting against a werewolf, + correct_votes / the total number of players other than werewolves + total_votes:the total number of votes cast + """ + with open(out_txtfile, "r") as out_file: + text = out_file.read() + chunks = self.parse_vote_text2chunks(text) + res = [] + for k, v in chunks.items(): + if v != "": + chunks_list = list(chunks.keys()) + total_votes = len(chunks_list) - 1 + werewolves = self.get_werewolves(v) + non_werewolves = self.get_non_werewolves(v) + good_vote_rate = self.get_goodteam_vote_rate(v) + votewolf_difficulty = self.get_votewolf_difficulty(werewolves, non_werewolves) + folder = Utils().filename_to_foldername(out_txtfile) + result = { + "folder": folder, + "file": Path(out_txtfile).stem + ".txt", + "vote_round": k, + "good_vote_rate": good_vote_rate, + "total_votes": total_votes, + "votewolf_difficulty": votewolf_difficulty, + } + res.append(result) + df = pd.DataFrame(res) + return df + + def calc_avg_rate(self, IN_PATH) -> pd.DataFrame: + """ + get avg_rate for each game + avg_rate : the good_rate/total number of votes in the game + vote1_rate: First Round Voting Accuracy Rate + """ + infiles_list = self._get_log_fileslist(IN_PATH) + votefiles_list = self.extract_votes_from_logs(infiles_list) + df_list = [self._load_df_from_file(file) for file in votefiles_list] + combined_df = pd.concat(df_list, ignore_index=True) + # calculate the average good_vote_rate for each file + mean_rates = self._calculate_mean_rates(combined_df) + combined_df["avg_rate"] = combined_df["file"].map(mean_rates) + # calculate vote1 rate + vote1_rates = self._calc_vote1_rates(combined_df) + combined_df["vote1_rate"] = combined_df["folder"].map(vote1_rates.set_index("folder")["good_vote_rate"]) + combined_df.loc[combined_df["vote_round"] != "vote_1", "vote1_rate"] = np.nan + combined_df["vote1_rate"] = combined_df["vote1_rate"].apply(self._format_rates) + combined_df["good_vote_rate"] = combined_df["good_vote_rate"].apply(self._format_rates) + combined_df["avg_rate"] = combined_df["avg_rate"].apply(self._format_rates) + combined_df.sort_values(["file"], ascending=True, inplace=True) + return combined_df + + def _calc_vote1_rates(self, df): + df_vote1 = df[df["vote_round"] == "vote_1"] + vote1_rates = df_vote1.groupby("folder")["good_vote_rate"].mean().reset_index() + return vote1_rates + + def _load_df_from_file(self, file): + return self.get_result_df(file) + + def _calculate_mean_rates(self, df): + return df.groupby("file")["good_vote_rate"].mean() + + def _format_rates(self, s): + return Utils().float_to_percent(s) + + def get_eval_csv(self, IN_PATH, EVAL_RESULT): + """ + IN_PATH : parent folder of ["01-10", "11-20", "21-30"] + EVAL_RESULT : output csv file path + """ + combined_df = self.calc_avg_rate(IN_PATH) + combined_df.to_csv(EVAL_RESULT, index=False) + + +if __name__ == "__main__": + IN_PATH = METAGPT_ROOT / "examples/werewolf_game/evals" + EVAL_RESULT = DEFAULT_WORKSPACE_ROOT / "outputs" / "goodteam_vote_rate.csv" + Vote().get_eval_csv(IN_PATH, EVAL_RESULT) diff --git a/examples/werewolf_game/evals/utils.py b/examples/werewolf_game/evals/utils.py new file mode 100644 index 000000000..490e7126f --- /dev/null +++ b/examples/werewolf_game/evals/utils.py @@ -0,0 +1,134 @@ +""" +Filename: MetaGPT/examples/werewolf_game/evals/utils.py +Created Date: Oct 11, 2023 +Revised Date: Oct 20, 2023 +Author: [Aria](https://github.com/ariafyy) +""" +import glob +import os +import re +from pathlib import Path + +from metagpt.const import METAGPT_ROOT + + +class Utils: + """Utils: utils of logs""" + + @staticmethod + def polish_log(in_logfile, out_txtfile): + """polish logs for evaluation""" + pattern_text = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) \| (\w+) +\| ([\w\.]+:\w+:\d+) - (.*\S)" + pattern_player = r"(Player(\d{1}): \w+)" + pattern_start = False + json_start = False + + with open(in_logfile, "r") as f, open(out_txtfile, "w") as out: + for line in f.readlines(): + matches = re.match(pattern_text, line) + if matches: + message = matches.group(4).strip() + pattern_start = True + json_start = False + + if ( + "Moderator(Moderator) ready to InstructSpeak" not in message + and "Moderator(Moderator) ready to ParseSpeak" not in message + and "Total running cost:" not in message + ): + out.write("- " + message + "\n") + else: + out.write("\n") + + elif pattern_start and not matches: + if "gpt-4 may update over time" in line: + line = "" + out.write(line) + + elif line.strip().startswith("{"): + out.write(line.strip()) + json_start = True + + elif json_start and not line.strip().endswith("}"): + out.write(line.strip()) + + elif json_start and line.strip().endswith("}"): + out.write(line.strip()) + json_start = False + + elif ( + line.startswith("(User):") or line.startswith("********** STEP:") or re.search(pattern_player, line) + ): + out.write(line) + + else: + out.write("\n") + + @staticmethod + def pick_vote_log(in_logfile, out_txtfile): + """ + pick the vote log from the log file. + ready to AnnounceGameResult serves as the 'HINT_TEXT ' which indicates the end of the game. + based on bservation and reflection, then discuss is not in vote session. + """ + pattern_vote = r"(Player\d+)\(([A-Za-z]+)\): (\d+) \| (I vote to eliminate Player\d+)" + ignore_text = """reflection""" + HINT_TEXT = r"ready to AnnounceGameResult" + pattern_moderator = r"\[([^\]]+)\]\. Say ONLY: I vote to eliminate ..." + in_valid_block = False + + with open(in_logfile, "r") as f: + lines = f.read() + split_lines = lines.split(HINT_TEXT) + + if len(split_lines) < 2: + print(f"Key text :{HINT_TEXT} not found in {in_logfile}") + return + + relevant_lines = split_lines[1].split("\n") + with open(out_txtfile, "w") as out: + for line in relevant_lines: + if re.search(pattern_moderator, line): + in_valid_block = True + out.write(line.lstrip() + "\n") + + elif in_valid_block and re.search(pattern_vote, line): + out.write(line + "\n") + elif ignore_text in line: + in_valid_block = False + + @staticmethod + def get_file_list(path: str) -> list: + file_pattern = os.path.join(path, "*.txt") + files_list = glob.glob(file_pattern) + return files_list + + @staticmethod + def filename_to_foldername(out_txtfile: str): + """ + convert filename into its parent folder name + input:"....../# 01-10_10132100.txt" + output:# 01-10 + """ + s = Path(out_txtfile).stem + pattern_folder = r"([^_]*)_" + match = re.match(pattern_folder, s) + if match: + folder = match.group(1) + return folder + + @staticmethod + def float_to_percent(decimal: float) -> str: + """ + input: 1.00 + output: 100.00% + """ + percent = decimal * 100 + return f"{percent:.2f}%" + + +if __name__ == "__main__": + in_logfile = METAGPT_ROOT / "logs/log.txt" + out_txtfile = "input your wish path" + # Utils().polish_log(in_logfile, out_txtfile) + Utils().pick_vote_log(in_logfile, out_txtfile) diff --git a/examples/werewolf_game/start_game.py b/examples/werewolf_game/start_game.py new file mode 100644 index 000000000..fdd17256a --- /dev/null +++ b/examples/werewolf_game/start_game.py @@ -0,0 +1,100 @@ +import asyncio +import random + +import fire + +from metagpt.ext.werewolf.roles import Guard, Moderator, Seer, Villager, Werewolf, Witch +from metagpt.ext.werewolf.roles.human_player import prepare_human_player +from metagpt.ext.werewolf.werewolf_game import WerewolfGame +from metagpt.logs import logger + + +def init_game_setup( + shuffle=True, + add_human=False, + use_reflection=True, + use_experience=False, + use_memory_selection=False, + new_experience_version="", +): + roles = [Villager, Villager, Werewolf, Werewolf, Guard, Seer, Witch] + if shuffle: + # random.seed(2023) + random.shuffle(roles) + if add_human: + assigned_role_idx = random.randint(0, len(roles) - 1) + assigned_role = roles[assigned_role_idx] + roles[assigned_role_idx] = prepare_human_player(assigned_role) + + players = [ + role( + name=f"Player{i+1}", + use_reflection=use_reflection, + use_experience=use_experience, + use_memory_selection=use_memory_selection, + new_experience_version=new_experience_version, + ) + for i, role in enumerate(roles) + ] + + if add_human: + logger.info(f"You are assigned {players[assigned_role_idx].name}({players[assigned_role_idx].profile})") + + game_setup = ["Game setup:"] + [f"{player.name}: {player.profile}," for player in players] + game_setup = "\n".join(game_setup) + + return game_setup, players + + +async def start_game( + investment: float = 3.0, + n_round: int = 5, + shuffle: bool = True, + add_human: bool = False, + use_reflection: bool = True, + use_experience: bool = False, + use_memory_selection: bool = False, + new_experience_version: str = "", +): + game = WerewolfGame() + game_setup, players = init_game_setup( + shuffle=shuffle, + add_human=add_human, + use_reflection=use_reflection, + use_experience=use_experience, + use_memory_selection=use_memory_selection, + new_experience_version=new_experience_version, + ) + players = [Moderator()] + players + game.hire(players) + game.invest(investment) + game.start_project(game_setup) + await game.run(n_round=n_round) + + +def main( + investment: float = 20.0, + n_round: int = 100, + shuffle: bool = True, + add_human: bool = False, + use_reflection: bool = True, + use_experience: bool = False, + use_memory_selection: bool = False, + new_experience_version: str = "", +): + asyncio.run( + start_game( + investment, + n_round, + shuffle, + add_human, + use_reflection, + use_experience, + use_memory_selection, + new_experience_version, + ) + ) + + +if __name__ == "__main__": + fire.Fire(main) diff --git a/metagpt/ext/werewolf/__init__.py b/metagpt/ext/werewolf/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/metagpt/ext/werewolf/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/metagpt/ext/werewolf/actions/__init__.py b/metagpt/ext/werewolf/actions/__init__.py new file mode 100644 index 000000000..c994c43c8 --- /dev/null +++ b/metagpt/ext/werewolf/actions/__init__.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +from metagpt.ext.werewolf.actions.werewolf_actions import Hunt, Impersonate +from metagpt.ext.werewolf.actions.guard_actions import Protect +from metagpt.ext.werewolf.actions.seer_actions import Verify +from metagpt.ext.werewolf.actions.witch_actions import Save, Poison +from metagpt.ext.werewolf.actions.common_actions import Speak, NighttimeWhispers, Reflect +from metagpt.ext.werewolf.actions.experience_operation import AddNewExperiences, RetrieveExperiences +from metagpt.ext.werewolf.actions.moderator_actions import InstructSpeak + +ACTIONS = { + "Speak": Speak, + "Hunt": Hunt, + "Protect": Protect, + "Verify": Verify, + "Save": Save, + "Poison": Poison, + "Impersonate": Impersonate, +} + +__all__ = ["NighttimeWhispers", "Reflect", "AddNewExperiences", "RetrieveExperiences", "InstructSpeak"] diff --git a/metagpt/ext/werewolf/actions/common_actions.py b/metagpt/ext/werewolf/actions/common_actions.py new file mode 100644 index 000000000..c5059b5e0 --- /dev/null +++ b/metagpt/ext/werewolf/actions/common_actions.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +import json + +from tenacity import retry, stop_after_attempt, wait_fixed + +from metagpt.actions import Action + + +class Speak(Action): + """Action: Any speak action in a game""" + + PROMPT_TEMPLATE = """ + { + "BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night." + ,"HISTORY": "You have knowledge to the following conversation: __context__" + ,"ATTENTION": "You can NOT VOTE a player who is NOT ALIVE now!" + ,"REFLECTION": "__reflection__" + ,"STRATEGY": __strategy__ + ,"PAST_EXPERIENCES": "__experiences__" + ,"MODERATOR_INSTRUCTION": __latest_instruction__, + ,"RULE": "Please follow the moderator's latest instruction, figure out if you need to speak your opinion or directly to vote: + 1. If the instruction is to SPEAK, speak in 200 words. Remember the goal of your role and try to achieve it using your speech; + 2. If the instruction is to VOTE, you MUST vote and ONLY say 'I vote to eliminate PlayerX', replace PlayerX with the actual player name, DO NOT include any other words." + ,"OUTPUT_FORMAT": + { + "ROLE": "Your role, in this case, __profile__" + ,"PLAYER_NAME": "Your name, in this case, __name__" + ,"LIVING_PLAYERS": "List living players based on MODERATOR_INSTRUCTION. Return a json LIST datatype." + ,"THOUGHTS": "Based on `MODERATOR_INSTRUCTION` and `RULE`, carefully think about what to say or vote so that your chance of win as __profile__ maximizes. + If you find similar situation in `PAST_EXPERIENCES`, you may draw lessons from them to refine your strategy, take better vote action, or improve your speech. + Give your step-by-step thought process, you should think no more than 3 steps. For example: My step-by-step thought process:..." + ,"RESPONSE": "Based on `MODERATOR_INSTRUCTION`, `RULE`, and the 'THOUGHTS' you had, express your opinion or cast a vote." + } + } + """ + STRATEGY = """ + Decide whether to reveal your identity based on benefits vs. risks, provide useful information, and vote to eliminate the most suspicious. + If you have special abilities, pay attention to those who falsely claims your role, for they are probably werewolves. + """ + + name: str = "Speak" + + @retry(stop=stop_after_attempt(2), wait=wait_fixed(1)) + async def run( + self, + profile: str, + name: str, + context: str, + latest_instruction: str, + reflection: str = "", + experiences: str = "", + ): + prompt = ( + self.PROMPT_TEMPLATE.replace("__context__", context) + .replace("__profile__", profile) + .replace("__name__", name) + .replace("__latest_instruction__", latest_instruction) + .replace("__strategy__", self.STRATEGY) + .replace("__reflection__", reflection) + .replace("__experiences__", experiences) + ) + + rsp = await self._aask(prompt) + rsp = rsp.replace("\n", " ") + rsp_json = json.loads(rsp) + + return rsp_json["RESPONSE"] + + +class NighttimeWhispers(Action): + """ + + Action: nighttime whispers with thinking processes + + Usage Example: + + class Hunt(NighttimeWhispers): + def __init__(self, name="Hunt", context=None, llm=None): + super().__init__(name, context, llm) + + class Protect(NighttimeWhispers): + def __init__(self, name="Protect", context=None, llm=None): + super().__init__(name, context, llm) + + class Verify(NighttimeWhispers): + def __init__(self, name="Verify", context=None, llm=None): + super().__init__(name, context, llm) + + class Save(NighttimeWhispers): + def __init__(self, name="Save", context=None, llm=None): + super().__init__(name, context, llm) + + def _update_prompt_json(self, prompt_json: dict, profile: str, name: str, context: str, **kwargs): + del prompt_json['ACTION'] + del prompt_json['ATTENTION'] + prompt_json["OUTPUT_FORMAT"]["THOUGHTS"] = "It is night time. Return the thinking steps of your decision of whether to save the player JUST be killed at this night." + prompt_json["OUTPUT_FORMAT"]["RESPONSE"] = "Follow the Moderator's instruction, decide whether you want to save that person or not. Return SAVE or PASS." + return prompt_json + + class Poison(NighttimeWhispers): + def __init__(self, name="Poison", context=None, llm=None): + super().__init__(name, context, llm) + + def _update_prompt_json(self, prompt_json: dict, profile: str, name: str, context: str, **kwargs): + prompt_json["OUTPUT_FORMAT"]["RESPONSE"] += "Or if you want to PASS, return PASS." + return prompt_json + """ + + PROMPT_TEMPLATE = """ + { + "BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night." + ,"HISTORY": "You have knowledge to the following conversation: __context__" + ,"ACTION": "Choose one living player to __action__." + ,"ATTENTION": "1. You can only __action__ a player who is alive this night! And you can not __action__ a player who is dead this night! 2. `HISTORY` is all the information you observed, DONT hallucinate other player actions!" + ,"REFLECTION": "__reflection__" + ,"STRATEGY": "__strategy__" + ,"PAST_EXPERIENCES": "__experiences__" + ,"OUTPUT_FORMAT": + { + "ROLE": "Your role, in this case, __profile__" + ,"PLAYER_NAME": "Your name, in this case, __name__" + ,"LIVING_PLAYERS": "List the players who is alive based on moderator's latest instruction. Return a json LIST datatype." + ,"THOUGHTS": "Choose one living player from `LIVING_PLAYERS` to __action__ this night. Return the reason why you choose to __action__ this player. If you observe nothing at first night, DONT imagine unexisting player actions! If you find similar situation in `PAST_EXPERIENCES`, you may draw lessons from them to refine your strategy and take better actions. Give your step-by-step thought process, you should think no more than 3 steps. For example: My step-by-step thought process:..." + ,"RESPONSE": "As a __profile__, you should choose one living player from `LIVING_PLAYERS` to __action__ this night according to the THOUGHTS you have just now. Return the player name ONLY." + } + } + """ + STRATEGY = """ + Decide which player is most threatening to you or most needs your support, take your action correspondingly. + """ + + name: str = "NightTimeWhispers" + + def _construct_prompt_json( + self, role_profile: str, role_name: str, context: str, reflection: str, experiences: str, **kwargs + ): + prompt_template = self.PROMPT_TEMPLATE + + def replace_string(prompt_json: dict): + k: str + for k in prompt_json.keys(): + if isinstance(prompt_json[k], dict): + prompt_json[k] = replace_string(prompt_json[k]) + continue + prompt_json[k] = prompt_json[k].replace("__profile__", role_profile) + prompt_json[k] = prompt_json[k].replace("__name__", role_name) + prompt_json[k] = prompt_json[k].replace("__context__", context) + prompt_json[k] = prompt_json[k].replace("__action__", self.name) + prompt_json[k] = prompt_json[k].replace("__strategy__", self.STRATEGY) + prompt_json[k] = prompt_json[k].replace("__reflection__", reflection) + prompt_json[k] = prompt_json[k].replace("__experiences__", experiences) + + return prompt_json + + prompt_json: dict = json.loads(prompt_template) + + prompt_json = replace_string(prompt_json) + + prompt_json: dict = self._update_prompt_json( + prompt_json, role_profile, role_name, context, reflection, experiences, **kwargs + ) + assert isinstance(prompt_json, dict) + + prompt: str = json.dumps(prompt_json, indent=4, ensure_ascii=False) + + return prompt + + def _update_prompt_json( + self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str + ) -> dict: + # one can modify the prompt_json dictionary here + return prompt_json + + @retry(stop=stop_after_attempt(2), wait=wait_fixed(1)) + async def run(self, context: str, profile: str, name: str, reflection: str = "", experiences: str = ""): + prompt = self._construct_prompt_json( + role_profile=profile, role_name=name, context=context, reflection=reflection, experiences=experiences + ) + + rsp = await self._aask(prompt) + rsp = rsp.replace("\n", " ") + rsp_json = json.loads(rsp) + + return f"{self.name} " + rsp_json["RESPONSE"] + + +class Reflect(Action): + PROMPT_TEMPLATE = """ + { + "BACKGROUND": "It's a Werewolf game, in this game, we have 2 werewolves, 2 villagers, 1 guard, 1 witch, 1 seer. You are __profile__. Note that villager, seer, guard and witch are all in villager side, they have the same objective. Werewolves can collectively hunt ONE player at night." + ,"HISTORY": "You have knowledge to the following conversation: __context__" + ,"MODERATOR_INSTRUCTION": __latest_instruction__, + ,"OUTPUT_FORMAT" (a json): + { + "ROLE": "Your role, in this case, __profile__" + ,"PLAYER_NAME": "Your name, in this case, __name__" + "GAME_STATES": "You are about to follow `MODERATOR_INSTRUCTION`, but before taking any action, analyze each player, including the living and the dead, and summarize the game states. + For each player, your reflection should be a ONE-LINE json covering the following dimension, return a LIST of jsons (return an empty LIST for the first night): + [ + {"TARGET": "the player you will analyze, if the player is yourself or your werewolf partner, indicate it" ,"STATUS": "living or dead, if dead, how was he/she possibly killed?", "CLAIMED_ROLE": "claims a role or not, if so, what role, any contradiction to others? If there is no claim, return 'None'", "SIDE_WITH": "sides with which players? If none, return 'None'", "ACCUSE": "accuses which players? If none, return 'None'"} + ,{...} + ,... + ]" + ,"REFLECTION": "Based on the whole `GAME_STATES`, return a json (return an empty string for the first night): + { + "Player1": "the true role (werewolf / special role / villager, living or dead) you infer about him/her, and why is this role? If the player is yourself or your werewolf partner, indicate it." + ,... + ,"Player7": "the true role (werewolf / special role / villager, living or dead) you infer about him/her, and why is this role? If the player is yourself or your werewolf partner, indicate it." + ,"GAME_STATE_SUMMARIZATION": "summarize the current situation from your standpoint in one sentence, your summarization should catch the most important information from your reflection, such as conflicts, number of living werewolves, special roles, and villagers." + }" + } + } + """ + + name: str = "Reflect" + + @retry(stop=stop_after_attempt(2), wait=wait_fixed(1)) + async def run(self, profile: str, name: str, context: str, latest_instruction: str): + prompt = ( + self.PROMPT_TEMPLATE.replace("__context__", context) + .replace("__profile__", profile) + .replace("__name__", name) + .replace("__latest_instruction__", latest_instruction) + ) + + rsp = await self._aask(prompt) + rsp = rsp.replace("\n", " ") + rsp_json = json.loads(rsp) + + return json.dumps(rsp_json["REFLECTION"]) diff --git a/metagpt/ext/werewolf/actions/experience_operation.py b/metagpt/ext/werewolf/actions/experience_operation.py new file mode 100644 index 000000000..a870ee6b8 --- /dev/null +++ b/metagpt/ext/werewolf/actions/experience_operation.py @@ -0,0 +1,200 @@ +import glob +import json +import os +from typing import Optional + +import chromadb +from chromadb import Collection +from chromadb.utils import embedding_functions +from pydantic import model_validator + +from metagpt.actions import Action +from metagpt.config2 import config +from metagpt.const import DEFAULT_WORKSPACE_ROOT +from metagpt.ext.werewolf.schema import RoleExperience +from metagpt.logs import logger + +DEFAULT_COLLECTION_NAME = "role_reflection" # FIXME: some hard code for now +EMB_FN = embedding_functions.OpenAIEmbeddingFunction( + api_key=config.llm.api_key, + api_base=config.llm.base_url, + api_type=config.llm.api_type, + model_name="text-embedding-ada-002", +) + + +class AddNewExperiences(Action): + name: str = "AddNewExperience" + collection_name: str = DEFAULT_COLLECTION_NAME + delete_existing: bool = False + collection: Optional[Collection] = None + + @model_validator(mode="after") + def validate_collection(self): + if self.collection: + return + + chroma_client = chromadb.PersistentClient(path=f"{DEFAULT_WORKSPACE_ROOT}/werewolf_game/chroma") + if self.delete_existing: + try: + chroma_client.get_collection(name=self.collection_name) + chroma_client.delete_collection(name=self.collection_name) + logger.info(f"existing collection {self.collection_name} deleted") + except: + pass + + self.collection = chroma_client.get_or_create_collection( + name=self.collection_name, + metadata={"hnsw:space": "cosine"}, + embedding_function=EMB_FN, + ) + + def run(self, experiences: list[RoleExperience]): + if not experiences: + return + for i, exp in enumerate(experiences): + exp.id = f"{exp.profile}-{exp.name}-step{i}-round_{exp.round_id}" + ids = [exp.id for exp in experiences] + documents = [exp.reflection for exp in experiences] + metadatas = [exp.model_dump() for exp in experiences] + + AddNewExperiences._record_experiences_local(experiences) + + self.collection.add(documents=documents, metadatas=metadatas, ids=ids) + + def add_from_file(self, file_path): + with open(file_path, "r") as fl: + lines = fl.readlines() + experiences = [RoleExperience.model_validate_json(line) for line in lines] + experiences = [exp for exp in experiences if len(exp.reflection) > 2] # not "" or not '""' + + ids = [exp.id for exp in experiences] + documents = [exp.reflection for exp in experiences] + metadatas = [exp.model_dump() for exp in experiences] + + self.collection.add(documents=documents, metadatas=metadatas, ids=ids) + + @staticmethod + def _record_experiences_local(experiences: list[RoleExperience]): + round_id = experiences[0].round_id + version = experiences[0].version + version = "test" if not version else version + experiences = [exp.model_dump_json() for exp in experiences] + experience_folder = DEFAULT_WORKSPACE_ROOT / f"werewolf_game/experiences/{version}" + if not os.path.exists(experience_folder): + os.makedirs(experience_folder) + save_path = f"{experience_folder}/{round_id}.json" + with open(save_path, "a") as fl: + fl.write("\n".join(experiences)) + fl.write("\n") + logger.info(f"experiences saved to {save_path}") + + +class RetrieveExperiences(Action): + name: str = "RetrieveExperiences" + collection_name: str = DEFAULT_COLLECTION_NAME + has_experiences: bool = True + collection: Optional[Collection] = None + + @model_validator(mode="after") + def validate_collection(self): + if self.collection: + return + chroma_client = chromadb.PersistentClient(path=f"{DEFAULT_WORKSPACE_ROOT}/werewolf_game/chroma") + try: + self.collection = chroma_client.get_collection( + name=self.collection_name, + embedding_function=EMB_FN, + ) + self.has_experiences = True + except: + logger.warning(f"No experience pool {self.collection_name}") + self.has_experiences = False + + def run(self, query: str, profile: str, topk: int = 5, excluded_version: str = "", verbose: bool = False) -> str: + """_summary_ + + Args: + query (str): 用当前的reflection作为query去检索过去相似的reflection + profile (str): _description_ + topk (int, optional): _description_. Defaults to 5. + + Returns: + _type_: _description_ + """ + if not self.has_experiences or len(query) <= 2: # not "" or not '""' + return "" + + filters = {"profile": profile} + ### 消融实验逻辑 ### + if profile == "Werewolf": # 狼人作为基线,不用经验 + logger.warning("Disable werewolves' experiences") + return "" + if excluded_version: + filters = {"$and": [{"profile": profile}, {"version": {"$ne": excluded_version}}]} # 不用同一版本的经验,只用之前的 + ################# + + results = self.collection.query( + query_texts=[query], + n_results=topk, + where=filters, + ) + + logger.info(f"retrieve {profile}'s experiences") + past_experiences = [RoleExperience(**res) for res in results["metadatas"][0]] + if verbose: + logger.info("past_experiences: {}".format("\n\n".join(past_experiences))) + distances = results["distances"][0] + logger.info(f"distances: {distances}") + + template = """ + { + "Situation __i__": "__situation__" + ,"Moderator's instruction": "__instruction__" + ,"Your action or speech during that time": "__response__" + ,"Reality": "In fact, it turned out the true roles are __game_step__", + ,"Outcome": "You __outcome__ in the end" + } + """ + past_experiences = [ + ( + template.replace("__i__", str(i)) + .replace("__situation__", exp.reflection) + .replace("__instruction__", exp.instruction) + .replace("__response__", exp.response) + .replace("__game_step__", exp.game_setup.replace("0 | Game setup:\n", "").replace("\n", " ")) + .replace("__outcome__", exp.outcome) + ) + for i, exp in enumerate(past_experiences) + ] + logger.info("past_experiences: {}".format("\n".join(past_experiences))) + logger.info("retrieval done") + + return json.dumps(past_experiences) + + +# FIXME: below are some utility functions, should be moved to appropriate places +def delete_collection(name): + chroma_client = chromadb.PersistentClient(path=f"{DEFAULT_WORKSPACE_ROOT}/werewolf_game/chroma") + chroma_client.delete_collection(name=name) + + +def add_file_batch(folder, **kwargs): + action = AddNewExperiences(**kwargs) + file_paths = glob.glob(str(folder) + "/*") + for fp in file_paths: + logger.info(f"file_path: {fp}") + action.add_from_file(fp) + + +def modify_collection(): + chroma_client = chromadb.PersistentClient(path=f"{DEFAULT_WORKSPACE_ROOT}/werewolf_game/chroma") + collection = chroma_client.get_collection(name=DEFAULT_COLLECTION_NAME) + updated_name = DEFAULT_COLLECTION_NAME + "_backup" + collection.modify(name=updated_name) + try: + chroma_client.get_collection(name=DEFAULT_COLLECTION_NAME) + except: + logger.info(f"collection {DEFAULT_COLLECTION_NAME} not found") + updated_collection = chroma_client.get_collection(name=updated_name) + logger.info(f"updated_collection top5 documents {updated_collection.get()['documents'][-5:]}") diff --git a/metagpt/ext/werewolf/actions/guard_actions.py b/metagpt/ext/werewolf/actions/guard_actions.py new file mode 100644 index 000000000..fbacd9aeb --- /dev/null +++ b/metagpt/ext/werewolf/actions/guard_actions.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers + + +class Protect(NighttimeWhispers): + name: str = "Protect" diff --git a/metagpt/ext/werewolf/actions/moderator_actions.py b/metagpt/ext/werewolf/actions/moderator_actions.py new file mode 100644 index 000000000..6153c66c7 --- /dev/null +++ b/metagpt/ext/werewolf/actions/moderator_actions.py @@ -0,0 +1,113 @@ +from metagpt.actions import Action + +STEP_INSTRUCTIONS = { + # 上帝需要介入的全部步骤和对应指令 + # The 1-st night + 0: { + "content": "It’s dark, everyone close your eyes. I will talk with you/your team secretly at night.", + "send_to": "Moderator", # for moderator to continuen speaking + "restricted_to": "", + }, + 1: { + "content": "Guard, please open your eyes!", + "send_to": "Moderator", # for moderator to continuen speaking + "restricted_to": "", + }, + 2: { + "content": """Guard, now tell me who you protect tonight? + You only choose one from the following living options please: {living_players}. + Or you can pass. For example: Protect ...""", + "send_to": "Guard", + "restricted_to": "Moderator,Guard", + }, + 3: {"content": "Guard, close your eyes", "send_to": "Moderator", "restricted_to": ""}, + 4: {"content": "Werewolves, please open your eyes!", "send_to": "Moderator", "restricted_to": ""}, + 5: { + "content": """Werewolves, I secretly tell you that {werewolf_players} are + all of the 2 werewolves! Keep in mind you are teammates. The rest players are not werewolves. + choose one from the following living options please: + {living_players}. For example: Kill ...""", + "send_to": "Werewolf", + "restricted_to": "Moderator,Werewolf", + }, + 6: {"content": "Werewolves, close your eyes", "send_to": "Moderator", "restricted_to": ""}, + 7: {"content": "Witch, please open your eyes!", "send_to": "Moderator", "restricted_to": ""}, + 8: { + "content": """Witch, tonight {player_hunted} has been killed by the werewolves. + You have a bottle of antidote, would you like to save him/her? If so, say "Save", else, say "Pass".""", + "send_to": "Witch", + "restricted_to": "Moderator,Witch", + }, # 要先判断女巫是否有解药,再去询问女巫是否使用解药救人 + 9: { + "content": """Witch, you also have a bottle of poison, would you like to use it to kill one of the living players? + Choose one from the following living options: {living_players}. + If so, say ONLY "Poison PlayerX", replace PlayerX with the actual player name, else, say "Pass".""", + "send_to": "Witch", + "restricted_to": "Moderator,Witch", + }, # + 10: {"content": "Witch, close your eyes", "send_to": "Moderator", "restricted_to": ""}, + 11: {"content": "Seer, please open your eyes!", "send_to": "Moderator", "restricted_to": ""}, + 12: { + "content": """Seer, you can check one player's identity. Who are you going to verify its identity tonight? + Choose only one from the following living options:{living_players}.""", + "send_to": "Seer", + "restricted_to": "Moderator,Seer", + }, + 13: {"content": "Seer, close your eyes", "send_to": "Moderator", "restricted_to": ""}, + # The 1-st daytime + 14: { + "content": """It's daytime. Everyone woke up except those who had been killed.""", + "send_to": "Moderator", + "restricted_to": "", + }, + 15: {"content": "{player_current_dead} was killed last night!", "send_to": "Moderator", "restricted_to": ""}, + 16: { + "content": """Living players: {living_players}, now freely talk about the current situation based on your observation and + reflection with a few sentences. Decide whether to reveal your identity based on your reflection.""", + "send_to": "", # send to all to speak in daytime + "restricted_to": "", + }, + 17: { + "content": """Now vote and tell me who you think is the werewolf. Don’t mention your role. + You only choose one from the following living options please: + {living_players}. Say ONLY: I vote to eliminate ...""", + "send_to": "", + "restricted_to": "", + }, + 18: {"content": """{player_current_dead} was eliminated.""", "send_to": "Moderator", "restricted_to": ""}, +} + + +class InstructSpeak(Action): + name: str = "InstructSpeak" + + async def run(self, step_idx, living_players, werewolf_players, player_hunted, player_current_dead): + instruction_info = STEP_INSTRUCTIONS.get( + step_idx, {"content": "Unknown instruction.", "send_to": "", "restricted_to": ""} + ) + content = instruction_info["content"] + if "{living_players}" in content and "{werewolf_players}" in content: + content = content.format(living_players=living_players, werewolf_players=werewolf_players) + if "{living_players}" in content: + content = content.format(living_players=living_players) + if "{werewolf_players}" in content: + content = content.format(werewolf_players=werewolf_players) + if "{player_hunted}" in content: + content = content.format(player_hunted=player_hunted) + if "{player_current_dead}" in content: + player_current_dead = "No one" if not player_current_dead else player_current_dead + content = content.format(player_current_dead=player_current_dead) + + return content, instruction_info["send_to"], instruction_info["restricted_to"] + + +class ParseSpeak(Action): + name: str = "ParseSpeak" + + async def run(self): + pass + + +class AnnounceGameResult(Action): + async def run(self, winner: str, win_reason: str): + return f"Game over! {win_reason}. The winner is the {winner}" diff --git a/metagpt/ext/werewolf/actions/seer_actions.py b/metagpt/ext/werewolf/actions/seer_actions.py new file mode 100644 index 000000000..a2c4977c5 --- /dev/null +++ b/metagpt/ext/werewolf/actions/seer_actions.py @@ -0,0 +1,5 @@ +from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers + + +class Verify(NighttimeWhispers): + name: str = "Verify" diff --git a/metagpt/ext/werewolf/actions/werewolf_actions.py b/metagpt/ext/werewolf/actions/werewolf_actions.py new file mode 100644 index 000000000..057baf0b6 --- /dev/null +++ b/metagpt/ext/werewolf/actions/werewolf_actions.py @@ -0,0 +1,17 @@ +from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers, Speak + + +class Hunt(NighttimeWhispers): + name: str = "Hunt" + + +class Impersonate(Speak): + """Action: werewolf impersonating a good guy in daytime speak""" + + STRATEGY = """ + Try continuously impersonating a role, such as Seer, Guard, Villager, etc., in order to mislead + other players, make them trust you, and thus hiding your werewolf identity. However, pay attention to what your werewolf partner said, + DONT claim the same role as your werewolf partner. Remmber NOT to reveal your real identity as a werewolf! + """ + + name: str = "Impersonate" diff --git a/metagpt/ext/werewolf/actions/witch_actions.py b/metagpt/ext/werewolf/actions/witch_actions.py new file mode 100644 index 000000000..2dfee320c --- /dev/null +++ b/metagpt/ext/werewolf/actions/witch_actions.py @@ -0,0 +1,46 @@ +from metagpt.ext.werewolf.actions.common_actions import NighttimeWhispers + + +class Save(NighttimeWhispers): + name: str = "Save" + + def _update_prompt_json( + self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str + ) -> dict: + del prompt_json["ACTION"] + del prompt_json["ATTENTION"] + + prompt_json["OUTPUT_FORMAT"][ + "THOUGHTS" + ] = "It is night time. Return the thinking steps of your decision of whether to save the player JUST killed this night." + prompt_json["OUTPUT_FORMAT"][ + "RESPONSE" + ] = "Follow the Moderator's instruction, decide whether you want to save that person or not. Return SAVE or PASS." + + return prompt_json + + async def run(self, *args, **kwargs): + rsp = await super().run(*args, **kwargs) + action_name, rsp = rsp.split() + return rsp # 只需回复SAVE或PASS,不需要带上action名 + + +class Poison(NighttimeWhispers): + STRATEGY = """ + Only poison a player if you are confident he/she is a werewolf. Don't poison a player randomly or at first night. + If someone claims to be the witch, poison him/her, because you are the only witch, he/she can only be a werewolf. + """ + + name: str = "Poison" + + def _update_prompt_json( + self, prompt_json: dict, role_profile: str, role_name: str, context: str, reflection: str, experiences: str + ) -> dict: + prompt_json["OUTPUT_FORMAT"]["RESPONSE"] += "Or if you want to PASS, return PASS." + return prompt_json + + async def run(self, *args, **kwargs): + rsp = await super().run(*args, **kwargs) + if "pass" in rsp.lower(): + action_name, rsp = rsp.split() # 带PASS,只需回复PASS,不需要带上action名,否则是Poison PlayerX,无需改动 + return rsp diff --git a/metagpt/ext/werewolf/roles/__init__.py b/metagpt/ext/werewolf/roles/__init__.py new file mode 100644 index 000000000..fa9e3a642 --- /dev/null +++ b/metagpt/ext/werewolf/roles/__init__.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +from metagpt.ext.werewolf.roles.base_player import BasePlayer +from metagpt.ext.werewolf.roles.guard import Guard +from metagpt.ext.werewolf.roles.seer import Seer +from metagpt.ext.werewolf.roles.villager import Villager +from metagpt.ext.werewolf.roles.werewolf import Werewolf +from metagpt.ext.werewolf.roles.witch import Witch +from metagpt.ext.werewolf.roles.moderator import Moderator + +__all__ = ["BasePlayer", "Guard", "Moderator", "Seer", "Villager", "Witch", "Werewolf"] diff --git a/metagpt/ext/werewolf/roles/base_player.py b/metagpt/ext/werewolf/roles/base_player.py new file mode 100644 index 000000000..548a48177 --- /dev/null +++ b/metagpt/ext/werewolf/roles/base_player.py @@ -0,0 +1,162 @@ +import re + +from metagpt.ext.werewolf.actions import ( + ACTIONS, + AddNewExperiences, + InstructSpeak, + NighttimeWhispers, + Reflect, + RetrieveExperiences, + Speak, +) +from metagpt.ext.werewolf.schema import RoleExperience +from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.schema import Message + + +class BasePlayer(Role): + def __init__( + self, + name: str = "PlayerXYZ", + profile: str = "BasePlayer", + special_action_names: list[str] = [], + use_reflection: bool = True, + use_experience: bool = False, + use_memory_selection: bool = False, + new_experience_version: str = "", + **kwargs, + ): + super().__init__(name, profile, **kwargs) + # 通过 set_status() 更新状态。 + self.status = 0 # 0代表活着,1代表死亡 + + # 技能和监听配置 + self._watch([InstructSpeak]) # 监听Moderator的指令以做行动 + special_actions = [ACTIONS[action_name] for action_name in special_action_names] + capable_actions = [Speak] + special_actions + self.set_actions(capable_actions) # 给角色赋予行动技能 + self.special_actions = special_actions + + self.use_reflection = use_reflection + if not self.use_reflection and use_experience: + logger.warning("You must enable use_reflection before using experience") + self.use_experience = False + else: + self.use_experience = use_experience + self.new_experience_version = new_experience_version + self.use_memory_selection = use_memory_selection + + self.experiences = [] + + async def _observe(self) -> int: + if self.status == 1: + # 死者不再参与游戏 + return 0 + + await super()._observe() + # 只有发给全体的("")或发给自己的(self.profile)消息需要走下面的_react流程, + # 其他的收听到即可,不用做动作 + self._rc.news = [msg for msg in self._rc.news if msg.send_to in ["", self.profile]] + return len(self._rc.news) + + async def _think(self): + news = self._rc.news[0] + assert news.cause_by == InstructSpeak # 消息为来自Moderator的指令时,才去做动作 + if not news.restricted_to: + # 消息接收范围为全体角色的,做公开发言(发表投票观点也算发言) + self._rc.todo = Speak() + elif self.profile in news.restricted_to.split(","): + # FIXME: hard code to split, restricted为"Moderator"或"Moderator,角色profile" + # Moderator加密发给自己的,意味着要执行角色的特殊动作 + self._rc.todo = self.special_actions[0]() + + async def _act(self): + # todo为_think时确定的,有两种情况,Speak或Protect + todo = self._rc.todo + logger.info(f"{self._setting}: ready to {str(todo)}") + + # 可以用这个函数获取该角色的全部记忆和最新的instruction + memories = self.get_all_memories() + latest_instruction = self.get_latest_instruction() + # print("*" * 10, f"{self._setting}'s current memories: {memories}", "*" * 10) + + reflection = ( + await Reflect().run( + profile=self.profile, name=self.name, context=memories, latest_instruction=latest_instruction + ) + if self.use_reflection + else "" + ) + + experiences = ( + RetrieveExperiences().run( + query=reflection, profile=self.profile, excluded_version=self.new_experience_version + ) + if self.use_experience + else "" + ) + + # 根据自己定义的角色Action,对应地去run,run的入参可能不同 + if isinstance(todo, Speak): + rsp = await todo.run( + profile=self.profile, + name=self.name, + context=memories, + latest_instruction=latest_instruction, + reflection=reflection, + experiences=experiences, + ) + restricted_to = "" + + elif isinstance(todo, NighttimeWhispers): + rsp = await todo.run( + profile=self.profile, name=self.name, context=memories, reflection=reflection, experiences=experiences + ) + restricted_to = f"Moderator,{self.profile}" # 给Moderator发送使用特殊技能的加密消息 + + msg = Message( + content=rsp, + role=self.profile, + sent_from=self.name, + cause_by=type(todo), + send_to="", + restricted_to=restricted_to, + ) + + self.experiences.append( + RoleExperience( + name=self.name, + profile=self.profile, + reflection=reflection, + instruction=latest_instruction, + response=rsp, + version=self.new_experience_version, + ) + ) + + logger.info(f"{self._setting}: {rsp}") + + return msg + + def get_all_memories(self) -> str: + memories = self._rc.memory.get() + time_stamp_pattern = r"[0-9]+ \| " + # NOTE: 除Moderator外,其他角色使用memory,只能用m.sent_from(玩家名)不能用m.role(玩家角色),因为他们不知道说话者的身份 + memories = [f"{m.sent_from}: {re.sub(time_stamp_pattern, '', m.content)}" for m in memories] # regex去掉时间戳 + memories = "\n".join(memories) + return memories + + def get_latest_instruction(self) -> str: + return self._rc.important_memory[-1].content # 角色监听着Moderator的InstructSpeak,是其重要记忆,直接获取即可 + + def set_status(self, new_status): + self.status = new_status + + def record_experiences(self, round_id: str, outcome: str, game_setup: str): + experiences = [exp for exp in self.experiences if len(exp.reflection) > 2] # not "" or not '""' + for exp in experiences: + exp.round_id = round_id + exp.outcome = outcome + exp.game_setup = game_setup + AddNewExperiences().run(experiences) diff --git a/metagpt/ext/werewolf/roles/guard.py b/metagpt/ext/werewolf/roles/guard.py new file mode 100644 index 000000000..1a4471fbd --- /dev/null +++ b/metagpt/ext/werewolf/roles/guard.py @@ -0,0 +1,7 @@ +from metagpt.ext.werewolf.roles.base_player import BasePlayer + + +class Guard(BasePlayer): + name: str = "Guard" + profile: str = "Guard" + special_action_names: list[str] = ["Protect"] diff --git a/metagpt/ext/werewolf/roles/human_player.py b/metagpt/ext/werewolf/roles/human_player.py new file mode 100644 index 000000000..e47588b34 --- /dev/null +++ b/metagpt/ext/werewolf/roles/human_player.py @@ -0,0 +1,44 @@ +from metagpt.ext.werewolf.actions import Speak +from metagpt.ext.werewolf.roles import BasePlayer +from metagpt.logs import logger +from metagpt.schema import Message + + +async def _act(self): + todo = self._rc.todo + + memories = self.get_all_memories() + + input_instruction = f""" + ## As a reminder, you have access to the following game history: + {memories} + ## You are {self.name}({self.profile}) + ## Guidance: + 1. If you are performing a special action or exercising a vote, + end your response with "PlayerX", replace PlayerX with the actual player name, e.g., "..., kill/protect/poison/.../vote Player1". + 2. If it is a daytime free speech, you can speak in whatever format. + Now, please speak: + """ + rsp = input(input_instruction) # wait for human input + + msg_cause_by = type(todo) + msg_restricted_to = "" if isinstance(todo, Speak) else f"Moderator,{self.profile}" + + msg = Message( + content=rsp, + role=self.profile, + sent_from=self.name, + cause_by=msg_cause_by, + send_to="", + restricted_to=msg_restricted_to, # 给Moderator及自身阵营发送加密消息 + ) + + logger.info(f"{self._setting}: {rsp}") + + return msg + + +def prepare_human_player(player_class: BasePlayer): + # Dynamically define a human player class that inherits from a certain role class + HumanPlayer = type("HumanPlayer", (player_class,), {"_act": _act}) + return HumanPlayer diff --git a/metagpt/ext/werewolf/roles/moderator.py b/metagpt/ext/werewolf/roles/moderator.py new file mode 100644 index 000000000..0b841fd22 --- /dev/null +++ b/metagpt/ext/werewolf/roles/moderator.py @@ -0,0 +1,270 @@ +import re +from collections import Counter +from datetime import datetime + +from metagpt.actions.add_requirement import UserRequirement +from metagpt.const import DEFAULT_WORKSPACE_ROOT +from metagpt.ext.werewolf.actions import Hunt, Poison, Protect, Save, Verify +from metagpt.ext.werewolf.actions.moderator_actions import ( + STEP_INSTRUCTIONS, + AnnounceGameResult, + InstructSpeak, + ParseSpeak, +) +from metagpt.logs import logger +from metagpt.roles import Role +from metagpt.schema import Message + + +class Moderator(Role): + name: str = "Moderator" + profile: str = "Moderator" + + def __init__( + self, + name: str = "Moderator", + profile: str = "Moderator", + **kwargs, + ): + super().__init__(name, profile, **kwargs) + self._watch([UserRequirement, InstructSpeak, ParseSpeak]) + self.set_actions([InstructSpeak, ParseSpeak, AnnounceGameResult]) + self.step_idx = 0 + self.eval_step_idx = [] + + # game states + self.game_setup = "" + self.living_players = [] + self.werewolf_players = [] + self.villager_players = [] + self.special_role_players = [] + self.winner = None + self.win_reason = None + self.witch_poison_left = 1 + self.witch_antidote_left = 1 + + # player states of current night + self.player_hunted = None + self.player_protected = None + self.is_hunted_player_saved = False + self.player_poisoned = None + self.player_current_dead = [] + + def _parse_game_setup(self, game_setup: str): + self.game_setup = game_setup + self.living_players = re.findall(r"Player[0-9]+", game_setup) + self.werewolf_players = re.findall(r"Player[0-9]+: Werewolf", game_setup) + self.werewolf_players = [p.replace(": Werewolf", "") for p in self.werewolf_players] + self.villager_players = re.findall(r"Player[0-9]+: Villager", game_setup) + self.villager_players = [p.replace(": Villager", "") for p in self.villager_players] + self.special_role_players = [ + p for p in self.living_players if p not in self.werewolf_players + self.villager_players + ] + + def update_player_status(self, player_names: list[str]): + if not player_names: + return + roles_in_env = self._rc.env.get_roles() + for role_setting, role in roles_in_env.items(): + for player_name in player_names: + if player_name in role_setting: + role.set_status(new_status=1) # 更新为死亡 + + def _record_all_experiences(self): + roles_in_env = self._rc.env.get_roles() + timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + for _, role in roles_in_env.items(): + if role == self: + continue + if self.winner == "werewolf": + outcome = "won" if role.name in self.werewolf_players else "lost" + else: + outcome = "won" if role.name not in self.werewolf_players else "lost" + role.record_experiences(round_id=timestamp, outcome=outcome, game_setup=self.game_setup) + + async def _instruct_speak(self): + step_idx = self.step_idx % len(STEP_INSTRUCTIONS) + self.step_idx += 1 + return await InstructSpeak().run( + step_idx, + living_players=self.living_players, + werewolf_players=self.werewolf_players, + player_hunted=self.player_hunted, + player_current_dead=self.player_current_dead, + ) + + async def _parse_speak(self, memories): + logger.info(self.step_idx) + + latest_msg = memories[-1] + latest_msg_content = latest_msg.content + + match = re.search(r"Player[0-9]+", latest_msg_content[-10:]) # FIXME: hard code truncation + target = match.group(0) if match else "" + + # default return + msg_content = "Understood" + restricted_to = "" + + msg_cause_by = latest_msg.cause_by + if msg_cause_by == Hunt: + self.player_hunted = target + elif msg_cause_by == Protect: + self.player_protected = target + elif msg_cause_by == Verify: + if target in self.werewolf_players: + msg_content = f"{target} is a werewolf" + else: + msg_content = f"{target} is a good guy" + restricted_to = "Moderator,Seer" + elif msg_cause_by == Save: + if "pass" in latest_msg_content.lower(): + pass + elif not self.witch_antidote_left: + msg_content = "You have no antidote left and thus can not save the player" + restricted_to = "Moderator,Witch" + else: + self.witch_antidote_left -= 1 + self.is_hunted_player_saved = True + elif msg_cause_by == Poison: + if "pass" in latest_msg_content.lower(): + pass + elif not self.witch_poison_left: + msg_content = "You have no poison left and thus can not poison the player" + restricted_to = "Moderator,Witch" + else: + self.witch_poison_left -= 1 + self.player_poisoned = target # "" if not poisoned and "PlayerX" if poisoned + + return msg_content, restricted_to + + def _update_game_states(self, memories): + step_idx = self.step_idx % len(STEP_INSTRUCTIONS) + if step_idx not in [15, 18] or self.step_idx in self.eval_step_idx: # FIXME: hard code + return + else: + self.eval_step_idx.append(self.step_idx) # record evaluation, avoid repetitive evaluation at the same step + + if step_idx == 15: # FIXME: hard code + # night ends: after all special roles acted, process the whole night + self.player_current_dead = [] # reset + + if self.player_hunted != self.player_protected and not self.is_hunted_player_saved: + self.player_current_dead.append(self.player_hunted) + if self.player_poisoned: + self.player_current_dead.append(self.player_poisoned) + + self.living_players = [p for p in self.living_players if p not in self.player_current_dead] + self.update_player_status(self.player_current_dead) + # reset + self.player_hunted = None + self.player_protected = None + self.is_hunted_player_saved = False + self.player_poisoned = None + + elif step_idx == 18: # FIXME: hard code + # day ends: after all roles voted, process all votings + voting_msgs = memories[-len(self.living_players) :] + voted_all = [] + for msg in voting_msgs: + voted = re.search(r"Player[0-9]+", msg.content[-10:]) + if not voted: + continue + voted_all.append(voted.group(0)) + self.player_current_dead = [Counter(voted_all).most_common()[0][0]] # 平票时,杀最先被投的 + # print("*" * 10, "dead", self.player_current_dead) + self.living_players = [p for p in self.living_players if p not in self.player_current_dead] + self.update_player_status(self.player_current_dead) + + # game's termination condition + living_werewolf = [p for p in self.werewolf_players if p in self.living_players] + living_villagers = [p for p in self.villager_players if p in self.living_players] + living_special_roles = [p for p in self.special_role_players if p in self.living_players] + if not living_werewolf: + self.winner = "good guys" + self.win_reason = "werewolves all dead" + elif not living_villagers or not living_special_roles: + self.winner = "werewolf" + self.win_reason = "villagers all dead" if not living_villagers else "special roles all dead" + if self.winner is not None: + self._record_all_experiences() + + def _record_game_history(self): + if self.step_idx % len(STEP_INSTRUCTIONS) == 0 or self.winner is not None: + logger.info("a night and day cycle completed, examine all history") + print(self.get_all_memories()) + with open(DEFAULT_WORKSPACE_ROOT / "werewolf_transcript.txt", "w") as f: + f.write(self.get_all_memories()) + + async def _think(self): + if self.winner is not None: + self._rc.todo = AnnounceGameResult() + return + + latest_msg = self._rc.memory.get()[-1] + if latest_msg.role in ["User"]: + # 上一轮消息是用户指令,解析用户指令,开始游戏 + game_setup = latest_msg.content + self._parse_game_setup(game_setup) + self._rc.todo = InstructSpeak() + + elif latest_msg.role in [self.profile]: + # 1. 上一轮消息是Moderator自己的指令,继续发出指令,一个事情可以分几条消息来说 + # 2. 上一轮消息是Moderator自己的解析消息,一个阶段结束,发出新一个阶段的指令 + self._rc.todo = InstructSpeak() + + else: + # 上一轮消息是游戏角色的发言,解析角色的发言 + self._rc.todo = ParseSpeak() + + async def _act(self): + todo = self._rc.todo + logger.info(f"{self._setting} ready to {todo}") + + memories = self.get_all_memories(mode="msg") + + # 若进行完一夜一日的循环,打印和记录一次完整发言历史 + self._record_game_history() + + # 若一晚或一日周期结束,对当晚或当日的死者进行总结,并更新游戏状态 + self._update_game_states(memories) + + # 根据_think的结果,执行InstructSpeak还是ParseSpeak, 并将结果返回 + if isinstance(todo, InstructSpeak): + msg_content, msg_to_send_to, msg_restriced_to = await self._instruct_speak() + # msg_content = f"Step {self.step_idx}: {msg_content}" # HACK: 加一个unique的step_idx避免记忆的自动去重 + msg = Message( + content=msg_content, + role=self.profile, + sent_from=self.name, + cause_by=InstructSpeak, + send_to=msg_to_send_to, + restricted_to=msg_restriced_to, + ) + + elif isinstance(todo, ParseSpeak): + msg_content, msg_restriced_to = await self._parse_speak(memories) + # msg_content = f"Step {self.step_idx}: {msg_content}" # HACK: 加一个unique的step_idx避免记忆的自动去重 + msg = Message( + content=msg_content, + role=self.profile, + sent_from=self.name, + cause_by=ParseSpeak, + send_to="", + restricted_to=msg_restriced_to, + ) + + elif isinstance(todo, AnnounceGameResult): + msg_content = await AnnounceGameResult().run(winner=self.winner, win_reason=self.win_reason) + msg = Message(content=msg_content, role=self.profile, sent_from=self.name, cause_by=AnnounceGameResult) + + logger.info(f"{self._setting}: {msg_content}") + + return msg + + def get_all_memories(self, mode="str") -> str: + memories = self._rc.memory.get() + if mode == "str": + memories = [f"{m.sent_from}({m.role}): {m.content}" for m in memories] + memories = "\n".join(memories) + return memories diff --git a/metagpt/ext/werewolf/roles/seer.py b/metagpt/ext/werewolf/roles/seer.py new file mode 100644 index 000000000..a7c0517fe --- /dev/null +++ b/metagpt/ext/werewolf/roles/seer.py @@ -0,0 +1,7 @@ +from metagpt.ext.werewolf.roles.base_player import BasePlayer + + +class Seer(BasePlayer): + name: str = "Seer" + profile: str = "Seer" + special_action_names: list[str] = ["Verify"] diff --git a/metagpt/ext/werewolf/roles/villager.py b/metagpt/ext/werewolf/roles/villager.py new file mode 100644 index 000000000..769782dc2 --- /dev/null +++ b/metagpt/ext/werewolf/roles/villager.py @@ -0,0 +1,7 @@ +from metagpt.ext.werewolf.roles.base_player import BasePlayer + + +class Villager(BasePlayer): + name: str = "Villager" + profile: str = "Villager" + special_action_names: list[str] = [] diff --git a/metagpt/ext/werewolf/roles/werewolf.py b/metagpt/ext/werewolf/roles/werewolf.py new file mode 100644 index 000000000..0a0c8a002 --- /dev/null +++ b/metagpt/ext/werewolf/roles/werewolf.py @@ -0,0 +1,14 @@ +from metagpt.ext.werewolf.actions import Impersonate, Speak +from metagpt.ext.werewolf.roles.base_player import BasePlayer + + +class Werewolf(BasePlayer): + name: str = "Werewolf" + profile: str = "Werewolf" + special_action_names: list[str] = ["Hunt"] + + async def _think(self): + """狼人白天发言时需要伪装,与其他角色不同,因此需要重写_think""" + await super()._think() + if isinstance(self._rc.todo, Speak): + self._rc.todo = Impersonate() diff --git a/metagpt/ext/werewolf/roles/witch.py b/metagpt/ext/werewolf/roles/witch.py new file mode 100644 index 000000000..70375052f --- /dev/null +++ b/metagpt/ext/werewolf/roles/witch.py @@ -0,0 +1,26 @@ +from metagpt.ext.werewolf.actions import InstructSpeak, Poison, Save, Speak +from metagpt.ext.werewolf.roles.base_player import BasePlayer + + +class Witch(BasePlayer): + name: str = "Witch" + profile: str = "Witch" + special_action_names: list[str] = ["Save", "Poison"] + + async def _think(self): + """女巫涉及两个特殊技能,因此在此需要改写_think进行路由""" + news = self._rc.news[0] + assert news.cause_by == InstructSpeak # 消息为来自Moderator的指令时,才去做动作 + if not news.restricted_to: + # 消息接收范围为全体角色的,做公开发言(发表投票观点也算发言) + self._rc.todo = Speak() + elif self.profile in news.restricted_to.split(","): + # FIXME: hard code to split, restricted为"Moderator"或"Moderator,角色profile" + # Moderator加密发给自己的,意味着要执行角色的特殊动作 + # 这里用关键词进行动作的选择,需要Moderator侧的指令进行配合 + if "save" in news.content.lower(): + self._rc.todo = Save() + elif "poison" in news.content.lower(): + self._rc.todo = Poison() + else: + raise ValueError("Moderator's instructions must include save or poison keyword") diff --git a/metagpt/ext/werewolf/schema.py b/metagpt/ext/werewolf/schema.py new file mode 100644 index 000000000..5d1cccac8 --- /dev/null +++ b/metagpt/ext/werewolf/schema.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + + +class RoleExperience(BaseModel): + id: str = "" + name: str = "" + profile: str + reflection: str + instruction: str = "" + response: str + outcome: str = "" + round_id: str = "" + game_setup: str = "" + version: str = "" diff --git a/metagpt/ext/werewolf/werewolf_game.py b/metagpt/ext/werewolf/werewolf_game.py new file mode 100644 index 000000000..6609df949 --- /dev/null +++ b/metagpt/ext/werewolf/werewolf_game.py @@ -0,0 +1,42 @@ +from metagpt.actions.add_requirement import UserRequirement +from metagpt.environment import Environment +from metagpt.schema import Message +from metagpt.team import Team + + +class WerewolfEnvironment(Environment): + timestamp: int = 0 + + def publish_message(self, message: Message, add_timestamp: bool = True): + """向当前环境发布信息 + Post information to the current environment + """ + # self.message_queue.put(message) + if add_timestamp: + # 因消息内容可能重复,例如,连续两晚杀同一个人, + # 因此需要加一个unique的time_stamp以使得相同的message在加入记忆时不被自动去重 + message.content = f"{self.timestamp} | " + message.content + self.memory.add(message) + self.history += f"\n{message}" + + async def run(self, k=1): + """处理一次所有信息的运行,各角色顺序执行 + Process all Role runs at once + """ + for _ in range(k): + for role in self.roles.values(): + await role.run() + self.timestamp += 1 + + +class WerewolfGame(Team): + """Use the "software company paradigm" to hold a werewolf game""" + + environment = WerewolfEnvironment() + + def start_project(self, idea): + """Start a project from user instruction.""" + self.idea = idea + self.environment.publish_message( + Message(role="User", content=idea, cause_by=UserRequirement, restricted_to="Moderator") + ) diff --git a/metagpt/prompts/arbiter.py b/metagpt/prompts/arbiter.py new file mode 100644 index 000000000..bbe03f9b6 --- /dev/null +++ b/metagpt/prompts/arbiter.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@Time : 2023/9/26 1:12 +@Author : kevin-meng +@File : arbiter.py +""" + + +ARBITER = """ +As an experienced Arbiter, you possess the necessary competence, sound judgment, and absolute objectivity. you promise that you will officiate in games with complete impartiality, respecting and adhering to the rules that govern them, in the true spirit of sportsmanship. + +Please always remember the general duties of the Arbiters in a competition: +a. Ensure fair play and adhere to the Anti-cheating regulations. +b. Supervise the progress of the competition. +c. Observe the game and enforce decisions made, imposing penalties on players where appropriate. +d. Ensure that the Laws of the game are observed. + +The rules governing this competition are as follows: +=== +{rules} +=== + +The scoring dimensions for judging in this game are as follows: +=== +{dimensions} +=== + +After the end of the competition, the Arbiter should submit a report, which includes: +a. A summary report for the game. +b. The final standings. +c. Each player and their final score for each assessment category, along with the reasons for the ratings. +d. Any other important information +for example: + +## Summary + ...... + +## Results and Standings (Top3) + Top 1: player 1 + Top 2: player 2 + Top 2: player 3 + +## Scoring and Assessment Dimensions + - player 1 : socre + - dimension 1 + score: xx + reason: xx + - dimension 2 + score: xx + reason: xx + ...... + - player 2 + ...... + +## Conclusion + ...... + +""" diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index e0f8a7ea6..57dd51139 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -335,6 +335,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel): self.llm.cost_manager = self.context.cost_manager self.set_actions(self.actions) # reset actions to update llm and prefix + @property + def name(self): + """Get the role name""" + return self._setting.name + def _get_prefix(self): """Get the role prefix""" if self.desc: diff --git a/tests/metagpt/ext/werewolf/__init__.py b/tests/metagpt/ext/werewolf/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/tests/metagpt/ext/werewolf/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/tests/metagpt/ext/werewolf/actions/__init__.py b/tests/metagpt/ext/werewolf/actions/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/tests/metagpt/ext/werewolf/actions/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/tests/metagpt/ext/werewolf/actions/test_experience_operation.py b/tests/metagpt/ext/werewolf/actions/test_experience_operation.py new file mode 100644 index 000000000..a91cb5d09 --- /dev/null +++ b/tests/metagpt/ext/werewolf/actions/test_experience_operation.py @@ -0,0 +1,160 @@ +import json +import os + +import pytest + +from metagpt.const import DEFAULT_WORKSPACE_ROOT +from metagpt.ext.werewolf.actions import AddNewExperiences, RetrieveExperiences +from metagpt.ext.werewolf.schema import RoleExperience +from metagpt.logs import logger + + +class TestExperiencesOperation: + collection_name = "test" + test_round_id = "test_01" + version = "test" + samples_to_add = [ + RoleExperience( + profile="Witch", + reflection="The game is intense with two players claiming to be the Witch and one claiming to be the Seer. Player4's behavior is suspicious.", + response="", + outcome="", + round_id=test_round_id, + version=version, + ), + RoleExperience( + profile="Witch", + reflection="The game is in a critical state with only three players left, and I need to make a wise decision to save Player7 or not.", + response="", + outcome="", + round_id=test_round_id, + version=version, + ), + RoleExperience( + profile="Seer", + reflection="Player1, who is a werewolf, falsely claimed to be a Seer, and Player6, who might be a Witch, sided with him. I, as the real Seer, am under suspicion.", + response="", + outcome="", + round_id=test_round_id, + version=version, + ), + RoleExperience( + profile="TestRole", + reflection="Some test reflection1", + response="", + outcome="", + round_id=test_round_id, + version=version + "_01-10", + ), + RoleExperience( + profile="TestRole", + reflection="Some test reflection2", + response="", + outcome="", + round_id=test_round_id, + version=version + "_11-20", + ), + RoleExperience( + profile="TestRole", + reflection="Some test reflection3", + response="", + outcome="", + round_id=test_round_id, + version=version + "_21-30", + ), + ] + + @pytest.mark.asyncio + async def test_add(self): + saved_file = f"{DEFAULT_WORKSPACE_ROOT}/werewolf_game/experiences/{self.version}/{self.test_round_id}.json" + if os.path.exists(saved_file): + os.remove(saved_file) + + action = AddNewExperiences(collection_name=self.collection_name, delete_existing=True) + action.run(self.samples_to_add) + + # test insertion + inserted = action.collection.get() + assert len(inserted["documents"]) == len(self.samples_to_add) + + # test if we record the samples correctly to local file + # & test if we could recover a embedding db from the file + action = AddNewExperiences(collection_name=self.collection_name, delete_existing=True) + action.add_from_file(saved_file) + inserted = action.collection.get() + assert len(inserted["documents"]) == len(self.samples_to_add) + + @pytest.mark.asyncio + async def test_retrieve(self): + action = RetrieveExperiences(collection_name=self.collection_name) + + query = "one player claimed to be Seer and the other Witch" + results = action.run(query, profile="Witch") + results = json.loads(results) + + assert len(results) == 2, "Witch should have 2 experiences" + assert "The game is intense with two players" in results[0] + + @pytest.mark.asyncio + async def test_retrieve_filtering(self): + action = RetrieveExperiences(collection_name=self.collection_name) + + query = "some test query" + profile = "TestRole" + + excluded_version = "" + results = action.run(query, profile=profile, excluded_version=excluded_version) + results = json.loads(results) + assert len(results) == 3 + + excluded_version = self.version + "_21-30" + results = action.run(query, profile=profile, excluded_version=excluded_version) + results = json.loads(results) + assert len(results) == 2 + + +class TestActualRetrieve: + collection_name = "role_reflection" + + @pytest.mark.asyncio + async def test_check_experience_pool(self): + logger.info("check experience pool") + action = RetrieveExperiences(collection_name=self.collection_name) + all_experiences = action.collection.get() + logger.info(f"{len(all_experiences['metadatas'])=}") + print(*["metadatas"][-5:], sep="\n") + + @pytest.mark.asyncio + async def test_retrieve_werewolf_experience(self): + action = RetrieveExperiences(collection_name=self.collection_name) + + query = "there are conflicts" + + logger.info(f"test retrieval with {query=}") + action.run(query, "Werewolf") + + @pytest.mark.asyncio + async def test_retrieve_villager_experience(self): + action = RetrieveExperiences(collection_name=self.collection_name) + + query = "there are conflicts" + + logger.info(f"test retrieval with {query=}") + results = action.run(query, "Seer") + assert "conflict" in results # 相似局面应该需要包含conflict关键词 + + @pytest.mark.asyncio + async def test_retrieve_villager_experience_filtering(self): + action = RetrieveExperiences(collection_name=self.collection_name) + + query = "there are conflicts" + + excluded_version = "01-10" + logger.info(f"test retrieval with {excluded_version=}") + results_01_10 = action.run(query, profile="Seer", excluded_version=excluded_version, verbose=True) + + excluded_version = "11-20" + logger.info(f"test retrieval with {excluded_version=}") + results_11_20 = action.run(query, profile="Seer", excluded_version=excluded_version, verbose=True) + + assert results_01_10 != results_11_20 diff --git a/tests/metagpt/test_environment.py b/tests/metagpt/test_environment.py index 7559655d3..388e495b6 100644 --- a/tests/metagpt/test_environment.py +++ b/tests/metagpt/test_environment.py @@ -29,7 +29,7 @@ def test_add_role(env: Environment): name="Alice", profile="product manager", goal="create a new product", constraints="limited resources" ) env.add_role(role) - assert env.get_role(role.profile) == role + assert env.get_role(str(role._setting)) == role def test_get_roles(env: Environment):