diff --git a/examples/stanford_town/.gitignore b/examples/stanford_town/.gitignore deleted file mode 100644 index fe77153f9..000000000 --- a/examples/stanford_town/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -storage/test* -storage/unittest* \ No newline at end of file diff --git a/examples/stanford_town/README.md b/examples/stanford_town/README.md deleted file mode 100644 index 915c61356..000000000 --- a/examples/stanford_town/README.md +++ /dev/null @@ -1,36 +0,0 @@ -## Stanford Town Game - -### Pre-Description -The path configured in `examples/stanford_town/utils/const.py` is the storage path of the current project. In order to facilitate GA( [generative_agents](https://github.com/joonspk-research/generative_agents) )'s frontend docking data (to avoid changing its code), you can set the value `temp_storage_path` to `temp_storage` of `generative_agents` when start `run_st_game.py`. like - -`python3 run_st_game.py --temp_storage_path path/to/ga/temp_storage xxx` - -Or change the path under `const.py` like beflow - -``` -STORAGE_PATH = ROOT_PATH.joinpath("storage") -TEMP_STORAGE_PATH = ROOT_PATH.joinpath("temp_storage") -# updated -STORAGE_PATH = Path("{path/to/ga/storage}") -TEMP_STORAGE_PATH = Path("{path/to/ga/temp_storage}") -``` - -This can be used to achieve docking of simulation data without changing the GA code. Otherwise, the GA code must be modified to adapt to the MG output path. - -If you don't want to start from 0, copy other simulation directories under `generative_agents/environment/frontend_server/storage/` to `examples/stanford_town/storage`, and select a directory named `fork_sim_code`. - -### Backend service startup -The execution entry is `python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10` -or -`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10 --temp_storage_path path/to/ga/temp_storage` - -`idea` is the user's voice to the first Agent, and it is disseminated through this voice to see whether the final multi-agents achieve the goal of hosting or participating in the event. - -### Frontend service startup -Enter project folder `generative_agents` - -Enter `environment/frontend_server` and use `python3 manage.py runserver` to start the front-end service. -Visit `http://localhost:8000/simulator_home` to enter the current simulation interface. - -## Appreciation -The reproduction work has referred the `https://github.com/joonspk-research/generative_agents`, let's make a general statement here. diff --git a/examples/stanford_town/README_CN.md b/examples/stanford_town/README_CN.md deleted file mode 100644 index 445a5b1b3..000000000 --- a/examples/stanford_town/README_CN.md +++ /dev/null @@ -1,35 +0,0 @@ -## Stanford Town Game - -### 前置 -`examples/stanford_town/utils/const.py`配置的路径为当前项目的存储路径,为了方便GA( [generative_agents](https://github.com/joonspk-research/generative_agents) )的前端对接数据(避免改动它那块的代码),可在启动`run_st_game.py`加上`temp_storage_path`指向`generative_agents`对应的`temp_storage`路径。比如 - -`python3 run_st_game.py --temp_storage_path path/to/ga/temp_storage xxx` - -或将`const.py`下的 - -``` -STORAGE_PATH = ROOT_PATH.joinpath("storage") -TEMP_STORAGE_PATH = ROOT_PATH.joinpath("temp_storage") -# 更新为 -STORAGE_PATH = Path("{path/to/ga/storage}") -TEMP_STORAGE_PATH = Path("{path/to/ga/temp_storage}") -``` -这样可用实现不改变GA代码情况下,实现仿真数据的对接。不然得修改GA的代码来适配MG的输出路径。 - -如果你不想从0开始启动,拷贝`generative_agents/environment/frontend_server/storage/`下的其他仿真目录到`examples/stanford_town/storage`,并选择一个目录名作为`fork_sim_code`。 - -### 后端服务启动 -执行入口为:`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10` -或者 -`python3 run_st_game.py "Host a open lunch party at 13:00 pm" "base_the_ville_isabella_maria_klaus" "test_sim" 10 --temp_storage_path path/to/ga/temp_storage` - -`idea`为用户给第一个Agent的用户心声,并通过这个心声进行传播,看最后多智能体是否达到举办、参加活动的目标。 - -### 前端服务启动 -进入`generative_agents`项目目录 - -进入`environment/frontend_server`,使用`python3 manage.py runserver`启动前端服务。 -访问`http://localhost:8000/simulator_home` 进入当前的仿真界面。 - -## Appreciation -The reproduction work has referred the `https://github.com/joonspk-research/generative_agents`, let's make a general statement here. diff --git a/examples/stanford_town/actions/__init__.py b/examples/stanford_town/actions/__init__.py deleted file mode 100644 index 2bcf8efd0..000000000 --- a/examples/stanford_town/actions/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : diff --git a/examples/stanford_town/actions/agent_chat_sum_rel.py b/examples/stanford_town/actions/agent_chat_sum_rel.py deleted file mode 100644 index 3e564a60c..000000000 --- a/examples/stanford_town/actions/agent_chat_sum_rel.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : summarize relationship in a agent chat - -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -class AgentChatSumRel(STAction): - name: str = "AgentChatSumRel" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - resp = False - try: - _ = llm_resp.split('"')[0].strip() - resp = True - except Exception: - pass - return resp - - def _func_cleanup(self, llm_resp: str, prompt: str) -> str: - return llm_resp.split('"')[0].strip() - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, init_role: "STRole", target_role: "STRole", statements: str) -> str: - def create_prompt_input(init_role: "STRole", target_role: "STRole", statements: str) -> str: - prompt_input = [statements, init_role.name, target_role.name] - return prompt_input - - prompt_input = create_prompt_input(init_role, target_role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "summarize_chat_relationship_v2.txt") - - example_output = "Jane Doe is working on a project" - special_instruction = "The output should be a string that responds to the question." - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/decide_to_talk.py b/examples/stanford_town/actions/decide_to_talk.py deleted file mode 100644 index 414ee7e36..000000000 --- a/examples/stanford_town/actions/decide_to_talk.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : device to talk to another role, return yes or no - -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -class DecideToTalk(STAction): - name: str = "DecideToTalk" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - resp = False - try: - if llm_resp.split("Answer in yes or no:")[-1].strip().lower() in ["yes", "no"]: - resp = True - except ValueError: - pass - return resp - - def _func_cleanup(self, llm_resp: str, prompt: str) -> str: - return llm_resp.split("Answer in yes or no:")[-1].strip().lower() - - def _func_fail_default_resp(self) -> str: - return "yes" - - async def run(self, init_role: "STRole", target_role: "STRole", retrieved: dict, *args, **kwargs) -> bool: - """Run action""" - - def create_prompt_input(init_role: "STRole", target_role: "STRole", retrieved: dict) -> str: - scratch = init_role.rc.scratch - target_scratch = target_role.rc.scratch - last_chat = init_role.rc.memory.get_last_chat(target_role.name) - last_chatted_time = "" - last_chat_about = "" - if last_chat: - last_chatted_time = last_chat.created.strftime("%B %d, %Y, %H:%M:%S") - last_chat_about = last_chat.description - - context = "" - for c_node in retrieved["events"]: - curr_desc = c_node.description.split(" ") - curr_desc[2:3] = ["was"] - curr_desc = " ".join(curr_desc) - context += f"{curr_desc}. " - context += "\n" - for c_node in retrieved["thoughts"]: - context += f"{c_node.description}. " - - curr_time = scratch.curr_time.strftime("%B %d, %Y, %H:%M:%S %p") - init_act_desc = scratch.act_description - if "(" in init_act_desc: - init_act_desc = init_act_desc.split("(")[-1][:-1] - - if len(scratch.planned_path) == 0 and "waiting" not in init_act_desc: - init_p_desc = f"{init_role.name} is already {init_act_desc}" - elif "waiting" in init_act_desc: - init_p_desc = f"{init_role.name} is {init_act_desc}" - else: - init_p_desc = f"{init_role.name} is on the way to {init_act_desc}" - - target_act_desc = scratch.act_description - if "(" in target_act_desc: - target_act_desc = target_act_desc.split("(")[-1][:-1] - - if len(target_scratch.planned_path) == 0 and "waiting" not in init_act_desc: - target_p_desc = f"{target_role.name} is already {target_act_desc}" - elif "waiting" in init_act_desc: - target_p_desc = f"{init_role.name} is {init_act_desc}" - else: - target_p_desc = f"{target_role.name} is on the way to {target_act_desc}" - - prompt_input = [] - prompt_input += [context] - - prompt_input += [curr_time] - - prompt_input += [init_role.name] - prompt_input += [target_role.name] - prompt_input += [last_chatted_time] - prompt_input += [last_chat_about] - - prompt_input += [init_p_desc] - prompt_input += [target_p_desc] - prompt_input += [init_role.name] - prompt_input += [target_role.name] - return prompt_input - - prompt_input = create_prompt_input(init_role, target_role, retrieved) - prompt = self.generate_prompt_with_tmpl_filename( - prompt_input=prompt_input, tmpl_filename="decide_to_talk_v2.txt" - ) - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=20) # yes or no - result = True if output == "yes" else False - logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {result}") - return result diff --git a/examples/stanford_town/actions/dummy_action.py b/examples/stanford_town/actions/dummy_action.py deleted file mode 100644 index a5004d5ef..000000000 --- a/examples/stanford_town/actions/dummy_action.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : dummy action to make every STRole can deal DummyMessage which is caused by DummyAction - -from metagpt.actions import Action -from metagpt.schema import Message - - -class DummyAction(Action): - async def run(self, *args, **kwargs): - raise NotImplementedError - - -class DummyMessage(Message): - """ - dummy message to pass to role and make them to have a execution every round - """ - - content: str = "dummy" - cause_by: str = "DummyAction" diff --git a/examples/stanford_town/actions/gen_action_details.py b/examples/stanford_town/actions/gen_action_details.py deleted file mode 100644 index 0eb7cb701..000000000 --- a/examples/stanford_town/actions/gen_action_details.py +++ /dev/null @@ -1,403 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : gen_action_details - -import random - -from metagpt.environment.stanford_town.env_space import EnvObsParams, EnvObsType -from metagpt.logs import logger - -from .st_action import STAction - - -class GenActionSector(STAction): - name: str = "GenActionSector" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cleaned_response = llm_resp.split("}")[0] - return cleaned_response - - def _func_validate(self, llm_resp: str, prompt: str): - if len(llm_resp.strip()) < 1: - return False - if "}" not in llm_resp: - return False - if "," in llm_resp: - return False - return True - - def _func_fail_default_resp(self): - fs = "kitchen" - return fs - - async def run(self, role: "STRole", access_tile: dict[str, str], act_desp: str): - def create_prompt_input(role, access_tile: dict[str, str], act_desp): - act_world = f"{access_tile['world']}" - - prompt_input = [] - - prompt_input += [role.scratch.get_str_name()] - prompt_input += [role.scratch.living_area.split(":")[1]] - x = f"{act_world}:{role.scratch.living_area.split(':')[1]}" - prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)] - - prompt_input += [role.scratch.get_str_name()] - prompt_input += [f"{access_tile['sector']}"] - x = f"{act_world}:{access_tile['sector']}" - prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)] - - if role.scratch.get_str_daily_plan_req() != "": - prompt_input += [f"\n{role.scratch.get_str_daily_plan_req()}"] - else: - prompt_input += [""] - - # MAR 11 TEMP - prompt_input = [] - act_world = access_tile["world"] - accessible_sector_str = role.s_mem.get_str_accessible_sectors(act_world) - curr = accessible_sector_str.split(", ") - fin_accessible_sectors = [] - for i in curr: - if "'s house" in i: - if role.scratch.last_name in i: - fin_accessible_sectors += [i] - else: - fin_accessible_sectors += [i] - accessible_sector_str = ", ".join(fin_accessible_sectors) - # END MAR 11 TEMP - - prompt_input += [accessible_sector_str] - - act_desp_1 = act_desp - act_desp_2 = act_desp - if "(" in act_desp: - act_desp_1 = act_desp.split("(")[0].strip() - act_desp_2 = act_desp.split("(")[-1][:-1] - prompt_input += [role.scratch.get_str_name()] - prompt_input += [act_desp_1] - - prompt_input += [act_desp_2] - prompt_input += [role.scratch.get_str_name()] - return prompt_input - - prompt_template = "action_location_sector_v1.txt" - prompt_input = create_prompt_input(role, access_tile, act_desp) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=15) - y = f"{access_tile['world']}" - x = [i.strip() for i in role.s_mem.get_str_accessible_sectors(y).split(",")] - if output not in x: - # output = random.choice(x) - output = role.scratch.living_area.split(":")[1] - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenActionArena(STAction): - name: str = "GenActionArena" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cleaned_response = llm_resp.split("}")[0] - return cleaned_response - - def _func_validate(self, llm_resp: str, prompt: str): - if len(llm_resp.strip()) < 1: - return False - if "}" not in llm_resp: - return False - if "," in llm_resp: - return False - return True - - def _func_fail_default_resp(self): - fs = "kitchen" - return fs - - async def run(self, role: "STRole", act_desp: str, act_world: str, act_sector: str): - def create_prompt_input(role, act_desp, act_world, act_sector): - prompt_input = [] - prompt_input += [role.scratch.get_str_name()] - x = f"{act_world}:{act_sector}" - prompt_input += [act_sector] - - # MAR 11 TEMP - accessible_arena_str = role.s_mem.get_str_accessible_sector_arenas(x) - curr = accessible_arena_str.split(", ") - fin_accessible_arenas = [] - for i in curr: - if "'s room" in i: - if role.scratch.last_name in i: - fin_accessible_arenas += [i] - else: - fin_accessible_arenas += [i] - accessible_arena_str = ", ".join(fin_accessible_arenas) - # END MAR 11 TEMP - prompt_input += [accessible_arena_str] - act_desp_1 = act_desp - act_desp_2 = act_desp - if "(" in act_desp: - act_desp_1 = act_desp.split("(")[0].strip() - act_desp_2 = act_desp.split("(")[-1][:-1] - prompt_input += [role.scratch.get_str_name()] - prompt_input += [act_desp_1] - - prompt_input += [act_desp_2] - prompt_input += [role.scratch.get_str_name()] - - prompt_input += [act_sector] - prompt_input += [accessible_arena_str] - return prompt_input - - prompt_template = "action_location_object_vMar11.txt" - prompt_input = create_prompt_input(role, act_desp, act_world, act_sector) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp() - print("prompt ", prompt) - output = await self._run_gpt35_max_tokens(prompt, max_tokens=15) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenActionObject(STAction): - name: str = "GenActionObject" - - def _func_validate(self, llm_resp: str, prompt: str): - if len(llm_resp.strip()) < 1: - return False - return True - - def _func_cleanup(self, llm_resp: str, prompt: str): - cleaned_response = llm_resp.strip() - return cleaned_response - - def _func_fail_default_resp(self): - fs = "bed" - return fs - - async def run(self, role: "STRole", act_desp: str, temp_address: str): - def create_prompt_input(role, act_desp, temp_address): - prompt_input = [] - if "(" in act_desp: - act_desp = act_desp.split("(")[-1][:-1] - - prompt_input += [act_desp] - prompt_input += [role.s_mem.get_str_accessible_arena_game_objects(temp_address)] - return prompt_input - - prompt_template = "action_object_v2.txt" - prompt_input = create_prompt_input(role, act_desp, temp_address) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=15) - x = [i.strip() for i in role.s_mem.get_str_accessible_arena_game_objects(temp_address).split(",")] - if output not in x: - output = random.choice(x) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenPronunciatio(STAction): - name: str = "GenPronunciatio" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cr = llm_resp.strip() - if len(cr) > 3: - cr = cr[:3] - return cr - - def _func_validate(self, llm_resp: str, prompt: str): - try: - self._func_cleanup(llm_resp, prompt="") - if len(llm_resp) == 0: - return False - except Exception: - return False - return True - - def _func_fail_default_resp(self): - fs = "😋" - return fs - - async def run(self, role: "STRole", act_desp: str): - def create_prompt_input(act_desp): - if "(" in act_desp: - act_desp = act_desp.split("(")[-1].split(")")[0] - prompt_input = [act_desp] - return prompt_input - - prompt_template = "generate_pronunciatio_v1.txt" - prompt_input = create_prompt_input(act_desp) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - example_output = "🛁🧖‍♀️" - special_instruction = "The value for the output must ONLY contain the emojis." - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenEventTriple(STAction): - name: str = "GenEventTriple" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cr = llm_resp.strip() - cr = [i.strip() for i in cr.split(")")[0].split(",")] - return cr - - def _func_validate(self, llm_resp: str, prompt: str): - try: - llm_resp = self._func_cleanup(llm_resp, prompt="") - if len(llm_resp) != 2: - return False - except Exception: - return False - return True - - def _func_fail_default_resp(self, role): - fs = (role.name, "is", "idle") - return fs - - async def run(self, role: "STRole", act_desp: str): - def create_prompt_input(role, act_desp): - if "(" in act_desp: - act_desp = act_desp.split("(")[-1].split(")")[0] - prompt_input = [role.name, act_desp, role.name] - return prompt_input - - prompt_template = "generate_event_triple_v1.txt" - prompt_input = create_prompt_input(role, act_desp) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp(role) - output = await self._run_gpt35_max_tokens(prompt, max_tokens=30) - output = (role.name, output[0], output[1]) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenActObjDescription(STAction): - name: str = "GenActObjDescription" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cr = llm_resp.strip() - if cr[-1] == ".": - cr = cr[:-1] - return cr - - def _func_validate(self, llm_resp: str, prompt: str): - try: - llm_resp = self._func_cleanup(llm_resp, prompt="") - except Exception: - return False - return True - - def _func_fail_default_resp(self, act_game_object): - fs = f"{act_game_object} is idle" - return fs - - async def run(self, role: "STRole", act_game_object: str, act_desp: str): - def create_prompt_input(act_game_object, act_desp, role): - prompt_input = [act_game_object, role.name, act_desp, act_game_object, act_game_object] - return prompt_input - - prompt_template = "generate_obj_event_v1.txt" - prompt_input = create_prompt_input(act_game_object, act_desp, role) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - example_output = "being fixed" - special_instruction = "The output should ONLY contain the phrase that should go in ." - self.fail_default_resp = self._func_fail_default_resp(act_game_object) - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenObjEventTriple(STAction): - name: str = "GenObjEventTriple" - - def _func_cleanup(self, llm_resp: str, prompt: str): - cr = llm_resp.strip() - cr = [i.strip() for i in cr.split(")")[0].split(",")] - return cr - - def _func_validate(self, llm_resp: str, prompt: str): - try: - llm_resp = self._func_cleanup(llm_resp, prompt="") - if len(llm_resp) != 2: - return False - except Exception: - return False - return True - - def _func_fail_default_resp(self, act_game_object: str): - fs = (act_game_object, "is", "idle") - return fs - - async def run(self, role: "STRole", act_game_object, act_obj_desp): - def create_prompt_input(act_game_object, act_obj_desp): - prompt_input = [act_game_object, act_obj_desp, act_game_object] - return prompt_input - - prompt_template = "generate_event_triple_v1.txt" - prompt_input = create_prompt_input(act_game_object, act_obj_desp) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp(act_game_object) - output = await self._run_gpt35_max_tokens(prompt, max_tokens=30) - output = (act_game_object, output[0], output[1]) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -class GenActionDetails(STAction): - name: str = "GenActionDetails" - - def _func_cleanup(self, llm_resp: str, prompt: str) -> list: - pass - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - # TODO -- this sometimes generates error - try: - self._func_cleanup(llm_resp) - except Exception: - return False - return True - - def _func_fail_default_resp(self): - fs = {} - return fs - - async def run(self, role: "STRole", act_desp: str, act_dura): - access_tile = role.rc.env.observe( - obs_params=EnvObsParams(obs_type=EnvObsType.GET_TITLE, coord=role.scratch.curr_tile) - ) - act_world = access_tile["world"] - act_sector = await GenActionSector().run(role, access_tile, act_desp) - act_arena = await GenActionArena().run(role, act_desp, act_world, act_sector) - act_address = f"{act_world}:{act_sector}:{act_arena}" - if not role.s_mem.get_str_accessible_arena_game_objects(act_address): - act_game_object = "" - else: - act_game_object = await GenActionObject().run(role, act_desp, act_address) - new_address = f"{act_world}:{act_sector}:{act_arena}:{act_game_object}" - act_pron = await GenPronunciatio().run(role, act_desp) - act_event = await GenEventTriple().run(role, act_desp) - # Persona's actions also influence the object states. We set those up here. - act_obj_desp = await GenActObjDescription().run(role, act_game_object, act_desp) - act_obj_pron = await GenPronunciatio().run(role, act_obj_desp) - act_obj_event = await GenObjEventTriple().run(role, act_game_object, act_obj_desp) - result_dict = { - "action_address": new_address, - "action_duration": int(act_dura), - "action_description": act_desp, - "action_pronunciatio": act_pron, - "action_event": act_event, - "chatting_with": None, - "chat": None, - "chatting_with_buffer": None, - "chatting_end_time": None, - "act_obj_description": act_obj_desp, - "act_obj_pronunciatio": act_obj_pron, - "act_obj_event": act_obj_event, - } - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {result_dict}") - return result_dict diff --git a/examples/stanford_town/actions/gen_daily_schedule.py b/examples/stanford_town/actions/gen_daily_schedule.py deleted file mode 100644 index 6deb6e6eb..000000000 --- a/examples/stanford_town/actions/gen_daily_schedule.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : gen_daily_schedule - - -from metagpt.logs import logger - -from .st_action import STAction - - -class GenDailySchedule(STAction): - name: str = "GenDailySchedule" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt="") - except Exception: - return False - return True - - def _func_cleanup(self, llm_resp: str, prompt: str) -> list: - cr = [] - _cr = llm_resp.split(")") - for i in _cr: - if i[-1].isdigit(): - i = i[:-1].strip() - if i[-1] == "." or i[-1] == ",": - cr += [i[:-1].strip()] - return cr - - def _func_fail_default_resp(self) -> int: - fs = [ - "wake up and complete the morning routine at 6:00 am", - "eat breakfast at 7:00 am", - "read a book from 8:00 am to 12:00 pm", - "have lunch at 12:00 pm", - "take a nap from 1:00 pm to 4:00 pm", - "relax and watch TV from 7:00 pm to 8:00 pm", - "go to bed at 11:00 pm", - ] - return fs - - async def run(self, role: "STRole", wake_up_hour: str): - def create_prompt_input(role, wake_up_hour): - prompt_input = [] - prompt_input += [role.scratch.get_str_iss()] - prompt_input += [role.scratch.get_str_lifestyle()] - prompt_input += [role.scratch.get_str_curr_date_str()] - prompt_input += [role.scratch.get_str_firstname()] - prompt_input += [f"{str(wake_up_hour)}:00 am"] - return prompt_input - - wake_up_hour = int(wake_up_hour) - prompt_template = "daily_planning_v6.txt" - prompt_input = create_prompt_input(role, wake_up_hour) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=500) - output = [f"wake up and complete the morning routine at {wake_up_hour}:00 am"] + output - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/gen_hourly_schedule.py b/examples/stanford_town/actions/gen_hourly_schedule.py deleted file mode 100644 index 5d59f96dd..000000000 --- a/examples/stanford_town/actions/gen_hourly_schedule.py +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : gen_hourly_schedule - -import random -import string - -from metagpt.logs import logger - -from .st_action import STAction - - -def get_random_alphanumeric(i=6, j=6): - """ - Returns a random alpha numeric strength that has the length of somewhere - between i and j. - - INPUT: - i: min_range for the length - j: max_range for the length - OUTPUT: - an alpha numeric str with the length of somewhere between i and j. - """ - k = random.randint(i, j) - x = "".join(random.choices(string.ascii_letters + string.digits, k=k)) - return x - - -class GenHourlySchedule(STAction): - name: str = "GenHourlySchedule" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt="") - except Exception: - return False - return True - - def _func_cleanup(self, llm_resp: str, prompt: str) -> list: - cr = llm_resp.strip() - if cr[-1] == ".": - cr = cr[:-1] - # to only use the first line of output - cr = cr.split("\n")[0] - return cr - - def _func_fail_default_resp(self) -> int: - fs = "asleep" - return fs - - async def _generate_schedule_for_given_hour( - self, role: "STRole", curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2=None - ): - def create_prompt_input(persona, curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2=None): - schedule_format = "" - for i in hour_str: - schedule_format += f"[{persona.scratch.get_str_curr_date_str()} -- {i}]" - schedule_format += " Activity: [Fill in]\n" - schedule_format = schedule_format[:-1] - - intermission_str = "Here the originally intended hourly breakdown of" - intermission_str += f" {persona.scratch.get_str_firstname()}'s schedule today: " - for count, i in enumerate(persona.scratch.daily_req): - intermission_str += f"{str(count + 1)}) {i}, " - intermission_str = intermission_str[:-2] - - prior_schedule = "" - if p_f_ds_hourly_org: - prior_schedule = "\n" - for count, i in enumerate(p_f_ds_hourly_org): - prior_schedule += f"[(ID:{get_random_alphanumeric()})" - prior_schedule += f" {persona.scratch.get_str_curr_date_str()} --" - prior_schedule += f" {hour_str[count]}] Activity:" - prior_schedule += f" {persona.scratch.get_str_firstname()}" - prior_schedule += f" is {i}\n" - - prompt_ending = f"[(ID:{get_random_alphanumeric()})" - prompt_ending += f" {persona.scratch.get_str_curr_date_str()}" - prompt_ending += f" -- {curr_hour_str}] Activity:" - prompt_ending += f" {persona.scratch.get_str_firstname()} is" - - if intermission2: - intermission2 = f"\n{intermission2}" - - prompt_input = [] - prompt_input += [schedule_format] - prompt_input += [persona.scratch.get_str_iss()] - - prompt_input += [prior_schedule + "\n"] - prompt_input += [intermission_str] - if intermission2: - prompt_input += [intermission2] - else: - prompt_input += [""] - prompt_input += [prompt_ending] - - return prompt_input - - prompt_template = "generate_hourly_schedule_v2.txt" - prompt_input = create_prompt_input(role, curr_hour_str, p_f_ds_hourly_org, hour_str, intermission2) - prompt_input_str = "\n".join(prompt_input) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=50) - logger.info( - f"Role: {role.name} _generate_schedule_for_given_hour prompt_input: {prompt_input_str}, " - f"output: {output}" - ) - return output - - async def run(self, role: "STRole", wake_up_hour: int): - hour_str = [ - "00:00 AM", - "01:00 AM", - "02:00 AM", - "03:00 AM", - "04:00 AM", - "05:00 AM", - "06:00 AM", - "07:00 AM", - "08:00 AM", - "09:00 AM", - "10:00 AM", - "11:00 AM", - "12:00 PM", - "01:00 PM", - "02:00 PM", - "03:00 PM", - "04:00 PM", - "05:00 PM", - "06:00 PM", - "07:00 PM", - "08:00 PM", - "09:00 PM", - "10:00 PM", - "11:00 PM", - ] - n_m1_activity = [] - diversity_repeat_count = 1 # TODO mg 1->3 - for i in range(diversity_repeat_count): - logger.info(f"diversity_repeat_count idx: {i}") - n_m1_activity_set = set(n_m1_activity) - if len(n_m1_activity_set) < 5: - n_m1_activity = [] - for count, curr_hour_str in enumerate(hour_str): - if wake_up_hour > 0: - n_m1_activity += ["sleeping"] - wake_up_hour -= 1 - else: - logger.info(f"_generate_schedule_for_given_hour idx: {count}, n_m1_activity: {n_m1_activity}") - n_m1_activity += [ - await self._generate_schedule_for_given_hour(role, curr_hour_str, n_m1_activity, hour_str) - ] - - # Step 1. Compressing the hourly schedule to the following format: - # The integer indicates the number of hours. They should add up to 24. - # [['sleeping', 6], ['waking up and starting her morning routine', 1], - # ['eating breakfast', 1], ['getting ready for the day', 1], - # ['working on her painting', 2], ['taking a break', 1], - # ['having lunch', 1], ['working on her painting', 3], - # ['taking a break', 2], ['working on her painting', 2], - # ['relaxing and watching TV', 1], ['going to bed', 1], ['sleeping', 2]] - _n_m1_hourly_compressed = [] - prev = None - prev_count = 0 - for i in n_m1_activity: - if i != prev: - prev_count = 1 - _n_m1_hourly_compressed += [[i, prev_count]] - prev = i - elif _n_m1_hourly_compressed: - _n_m1_hourly_compressed[-1][1] += 1 - - # Step 2. Expand to min scale (from hour scale) - # [['sleeping', 360], ['waking up and starting her morning routine', 60], - # ['eating breakfast', 60],.. - n_m1_hourly_compressed = [] - for task, duration in _n_m1_hourly_compressed: - n_m1_hourly_compressed += [[task, duration * 60]] - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {n_m1_hourly_compressed}") - return n_m1_hourly_compressed diff --git a/examples/stanford_town/actions/gen_iter_chat_utt.py b/examples/stanford_town/actions/gen_iter_chat_utt.py deleted file mode 100644 index 82104b6ca..000000000 --- a/examples/stanford_town/actions/gen_iter_chat_utt.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : generate_iterative_chat_utt - -from examples.stanford_town.actions.st_action import STAction -from examples.stanford_town.utils.utils import extract_first_json_dict -from metagpt.environment.stanford_town.env_space import EnvObsParams, EnvObsType -from metagpt.logs import logger - - -class GenIterChatUTT(STAction): - name: str = "GenIterChatUTT" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - resp = False - try: - _ = extract_first_json_dict(llm_resp) - resp = True - except Exception: - pass - return resp - - def _func_cleanup(self, llm_resp: str, prompt: str) -> dict: - gpt_response = extract_first_json_dict(llm_resp) - - cleaned_dict = dict() - cleaned = [] - for key, val in gpt_response.items(): - cleaned += [val] - cleaned_dict["utterance"] = cleaned[0] - cleaned_dict["end"] = True - if "f" in str(cleaned[1]) or "F" in str(cleaned[1]): - cleaned_dict["end"] = False - - return cleaned_dict - - def _func_fail_default_resp(self) -> dict: - cleaned_dict = dict() - cleaned_dict["utterance"] = "..." - cleaned_dict["end"] = False - return cleaned_dict - - async def run( - self, - init_role: "STRole", - target_role: "STRole", - retrieved: dict, - curr_context: str, - curr_chat: list[str], - *args, - **kwargs, - ) -> dict: - def create_prompt_input( - access_tile: dict[str, str], - init_role: "STRole", - target_role: "STRole", - retrieved: dict, - curr_context: str, - curr_chat: list[str], - ): - role = init_role - scratch = role.rc.scratch - target_scratch = target_role.rc.scratch - prev_convo_insert = "\n" - if role.rc.memory.chat_list: - for i in role.rc.memory.chat_list: - if i.object == target_role.name: - v1 = int((scratch.curr_time - i.created).total_seconds() / 60) - prev_convo_insert += ( - f"{str(v1)} minutes ago, {scratch.name} and " - f"{target_scratch.name} were already {i.description} " - f"This context takes place after that conversation." - ) - break - if prev_convo_insert == "\n": - prev_convo_insert = "" - if role.rc.memory.chat_list: - if int((scratch.curr_time - role.rc.memory.chat_list[-1].created).total_seconds() / 60) > 480: - prev_convo_insert = "" - print(prev_convo_insert) - - curr_sector = f"{access_tile['sector']}" - curr_arena = f"{access_tile['arena']}" - curr_location = f"{curr_arena} in {curr_sector}" - - retrieved_str = "" - for key, vals in retrieved.items(): - for v in vals: - retrieved_str += f"- {v.description}\n" - - convo_str = "" - for i in curr_chat: - convo_str += ": ".join(i) + "\n" - if convo_str == "": - convo_str = "[The conversation has not started yet -- start it!]" - - init_iss = f"Here is Here is a brief description of {scratch.name}.\n{scratch.get_str_iss()}" - prompt_input = [ - init_iss, - scratch.name, - retrieved_str, - prev_convo_insert, - curr_location, - curr_context, - scratch.name, - target_scratch.name, - convo_str, - scratch.name, - target_scratch.name, - scratch.name, - scratch.name, - scratch.name, - ] - return prompt_input - - access_tile = init_role.rc.env.observe( - obs_params=EnvObsParams(obs_type=EnvObsType.GET_TITLE, coord=init_role.scratch.curr_tile) - ) - prompt_input = create_prompt_input(access_tile, init_role, target_role, retrieved, curr_context, curr_chat) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "iterative_convo_v1.txt") - # original using `ChatGPT_safe_generate_response_OLD` - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_wo_extra_prompt(prompt) - logger.info(f"Role: {init_role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/inner_voice_action.py b/examples/stanford_town/actions/inner_voice_action.py deleted file mode 100644 index 121f1dcee..000000000 --- a/examples/stanford_town/actions/inner_voice_action.py +++ /dev/null @@ -1,31 +0,0 @@ -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -class AgentWhisperThoughtAction(STAction): - name: str = "AgentWhisperThoughtAction" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> list: - return llm_resp.split('"')[0].strip() - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str: - def create_prompt_input(role: "STRole", statements, test_input=None): - prompt_input = [role.scratch.name, statements] - return prompt_input - - prompt_input = create_prompt_input(role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "whisper_inner_thought_v1.txt") - - output = await self._run_gpt35_max_tokens(prompt, max_tokens=50) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/new_decomp_schedule.py b/examples/stanford_town/actions/new_decomp_schedule.py deleted file mode 100644 index c7078be9c..000000000 --- a/examples/stanford_town/actions/new_decomp_schedule.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : new_decomp_schedule - -import datetime - -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -class NewDecompSchedule(STAction): - name: str = "NewDecompSchedule" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - resp = False - try: - llm_resp = self._func_cleanup(llm_resp, prompt) - dur_sum = 0 - for act, dur in llm_resp: - dur_sum += dur - if isinstance(act, str): - return False - if isinstance(dur, int): - return False - x = prompt.split("\n")[0].split("originally planned schedule from")[-1].strip()[:-1] - x = [datetime.datetime.strptime(i.strip(), "%H:%M %p") for i in x.split(" to ")] - delta_min = int((x[1] - x[0]).total_seconds() / 60) - - if int(dur_sum) != int(delta_min): - return False - except Exception: - pass - return resp - - def _func_cleanup(self, llm_resp: str, prompt: str) -> list: - new_schedule = prompt + " " + llm_resp.strip() - new_schedule = new_schedule.split("The revised schedule:")[-1].strip() - new_schedule = new_schedule.split("\n") - - ret_temp = [] - for i in new_schedule: - ret_temp += [i.split(" -- ")] - - ret = [] - for time_str, action in ret_temp: - start_time = time_str.split(" ~ ")[0].strip() - end_time = time_str.split(" ~ ")[1].strip() - delta = datetime.datetime.strptime(end_time, "%H:%M") - datetime.datetime.strptime(start_time, "%H:%M") - delta_min = int(delta.total_seconds() / 60) - if delta_min < 0: - delta_min = 0 - ret += [[action, delta_min]] - - return ret - - def _func_fail_default_resp(self, main_act_dur: int, truncated_act_dur: int) -> int: - dur_sum = 0 - for act, dur in main_act_dur: - dur_sum += dur - - ret = truncated_act_dur[:] - ret += main_act_dur[len(ret) - 1 :] - - # If there are access, we need to trim... - ret_dur_sum = 0 - count = 0 - over = None - for act, dur in ret: - ret_dur_sum += dur - if ret_dur_sum == dur_sum: - break - if ret_dur_sum > dur_sum: - over = ret_dur_sum - dur_sum - break - count += 1 - - if over: - ret = ret[: count + 1] - ret[-1][1] -= over - - return ret - - async def run( - self, - role: "STRole", - main_act_dur: int, - truncated_act_dur: int, - start_time_hour: datetime, - end_time_hour: datetime, - inserted_act: str, - inserted_act_dur: int, - *args, - **kwargs, - ): - def create_prompt_input( - role: "STRole", - main_act_dur: int, - truncated_act_dur: int, - start_time_hour: datetime, - end_time_hour: datetime, - inserted_act: str, - inserted_act_dur: int, - ): - persona_name = role.name - start_hour_str = start_time_hour.strftime("%H:%M %p") - end_hour_str = end_time_hour.strftime("%H:%M %p") - - original_plan = "" - for_time = start_time_hour - for i in main_act_dur: - original_plan += ( - f'{for_time.strftime("%H:%M")} ~ ' - f'{(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + i[0] - ) - original_plan += "\n" - for_time += datetime.timedelta(minutes=int(i[1])) - - new_plan_init = "" - for_time = start_time_hour - for count, i in enumerate(truncated_act_dur): - new_plan_init += ( - f'{for_time.strftime("%H:%M")} ~ ' - f'{(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + i[0] - ) - new_plan_init += "\n" - if count < len(truncated_act_dur) - 1: - for_time += datetime.timedelta(minutes=int(i[1])) - - new_plan_init += (for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M") + " ~" - - prompt_input = [ - persona_name, - start_hour_str, - end_hour_str, - original_plan, - persona_name, - inserted_act, - inserted_act_dur, - persona_name, - start_hour_str, - end_hour_str, - end_hour_str, - new_plan_init, - ] - return prompt_input - - prompt_input = create_prompt_input( - role, main_act_dur, truncated_act_dur, start_time_hour, end_time_hour, inserted_act, inserted_act_dur - ) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "new_decomp_schedule_v1.txt") - self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur) - output = await self._run_gpt35_max_tokens(prompt, max_tokens=1000) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/run_reflect_action.py b/examples/stanford_town/actions/run_reflect_action.py deleted file mode 100644 index 055c03db4..000000000 --- a/examples/stanford_town/actions/run_reflect_action.py +++ /dev/null @@ -1,277 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : Integration Reflect Action - -import re - -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -# Run GPT Prompt Focal Point method -class AgentFocusPt(STAction): - name: str = "AgentFocusPt" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str: - try: - """ - Cleanup handling has been completed for run_v2 - """ - return llm_resp - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, n: int, test_input=None) -> str: - def create_prompt_input(role: "STRole", statements, n, test_input=None): - prompt_input = [statements, str(n)] - return prompt_input - - prompt_input = create_prompt_input(role, statements, n) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "generate_focal_pt_v1.txt") - - example_output = '["What should Jane do for lunch", "Does Jane like strawberry", "Who is Jane"]' - special_instruction = "Output must be a list of str." - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Insight and Guidance -class AgentInsightAndGuidance(STAction): - name: str = "AgentInsightAndGuidance" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> dict: - try: - llm_resp = "1. " + llm_resp.strip() - ret = dict() - for i in llm_resp.split("\n"): - row = " ".join(i.split(". ")[1:]) - if "(because of " not in row: - continue - thought = row.split("(because of ")[0].strip() - if ")" not in row.split("(because of ")[1]: - continue - evi_raw = row.split("(because of ")[1].split(")")[0].strip() - evi_raw = re.findall(r"\d+", evi_raw) - evi_raw = [int(i.strip()) for i in evi_raw] - ret[thought] = evi_raw - return ret - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self, n: int) -> str: - return ["I am hungry"] * n - - async def run(self, role: "STRole", statements: str, n: int, test_input=None) -> dict: - def create_prompt_input(role, statements, n, test_input=None): - prompt_input = [statements, str(n)] - return prompt_input - - prompt_input = create_prompt_input(role, statements, n) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "insight_and_evidence_v1.txt") - - self.fail_default_resp = self._func_fail_default_resp(n) - output = await self._run_gpt35_max_tokens(prompt, max_tokens=150) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Event Triple -class AgentEventTriple(STAction): - name: str = "AgentEventTriple" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - llm_resp = self._func_cleanup(llm_resp, prompt="") - if len(llm_resp) != 2: - return False - except Exception: - return False - return True - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> list: - try: - cr = llm_resp.strip() - cr = [i.strip() for i in cr.split(")")[0].split(",")] - if len(cr) != 2: - return cr[-2:] - return cr - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, statements: str, role: "STRole", verbose=False) -> tuple: - def create_prompt_input(statements, role): - if "(" in statements: - statements = statements.split("(")[-1].split(")")[0] - prompt_input = [role.scratch.name, statements, role.scratch.name] - return prompt_input - - prompt_input = create_prompt_input(statements, role) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "generate_event_triple_v1.txt") - - output = await self._run_gpt35_max_tokens(prompt, max_tokens=30) - output = (role.scratch.name, output[0], output[1]) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Event Poignancy -class AgentEventPoignancy(STAction): - name: str = "AgentEventPoignancy" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> int: - try: - llm_resp = int(llm_resp.strip()) - return llm_resp - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str: - def create_prompt_input(role: "STRole", statements: str, test_input=None): - prompt_input = [role.scratch.name, role.scratch.get_str_iss(), role.scratch.name, statements] - return prompt_input - - prompt_input = create_prompt_input(role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "poignancy_event_v1.txt") - - example_output = "5" # ######## - special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10." - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Chat Poignancy -class AgentChatPoignancy(STAction): - name: str = "AgentChatPoignancy" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> int: - try: - llm_resp = int(llm_resp.strip()) - return llm_resp - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str: - def create_prompt_input(role: "STRole", statements, test_input=None): - prompt_input = [role.scratch.name, role.scratch.get_str_iss(), role.scratch.name, statements] - return prompt_input - - prompt_input = create_prompt_input(role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "poignancy_chat_v1.txt") - - example_output = "5" # ######## - special_instruction = "The output should ONLY contain ONE integer value on the scale of 1 to 10." - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Planning Thought on Convo -class AgentPlanThoughtOnConvo(STAction): - name: str = "AgentPlanThoughtOnConvo" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str: - try: - return llm_resp.split('"')[0].strip() - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str: - def create_prompt_input(role, statements, test_input=None): - prompt_input = [statements, role.scratch.name, role.scratch.name, role.scratch.name] - return prompt_input - - prompt_input = create_prompt_input(role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "planning_thought_on_convo_v1.txt") - - output = await self._run_gpt35_max_tokens(prompt, max_tokens=50) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output - - -# Run GPT Prompt Memory on Convo -class AgentMemoryOnConvo(STAction): - name: str = "AgentMemoryOnConvo" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - try: - self._func_cleanup(llm_resp, prompt) - return True - except Exception: - return False - - def _func_cleanup(self, llm_resp: str, prompt: str = "") -> str: - try: - return llm_resp.split('"')[0].strip() - except Exception as exp: - logger.error(f"{self.cls_name} with error {exp}") - - def _func_fail_default_resp(self) -> str: - pass - - async def run(self, role: "STRole", statements: str, test_input=None, verbose=False) -> str: - def create_prompt_input(role, statements, test_input=None): - prompt_input = [statements, role.scratch.name, role.scratch.name, role.scratch.name] - return prompt_input - - prompt_input = create_prompt_input(role, statements) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "memo_on_convo_v1.txt") - example_output = "Jane Doe was interesting to talk to." - special_instruction = ( - "The output should ONLY contain a string that summarizes anything interesting " - "that the agent may have noticed" - ) - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/st_action.py b/examples/stanford_town/actions/st_action.py deleted file mode 100644 index 72ef851e0..000000000 --- a/examples/stanford_town/actions/st_action.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : StanfordTown Action -import json -import time -from abc import abstractmethod -from pathlib import Path -from typing import Any, Optional, Union - -from examples.stanford_town.utils.const import PROMPTS_DIR -from metagpt.actions.action import Action -from metagpt.config2 import config -from metagpt.logs import logger - - -class STAction(Action): - name: str = "STAction" - prompt_dir: Path = PROMPTS_DIR - fail_default_resp: Optional[str] = None - - @property - def cls_name(self): - return self.__class__.__name__ - - @abstractmethod - def _func_validate(self, llm_resp: str, prompt: str): - raise NotImplementedError - - @abstractmethod - def _func_cleanup(self, llm_resp: str, prompt: str): - raise NotImplementedError - - @abstractmethod - def _func_fail_default_resp(self): - raise NotImplementedError - - def generate_prompt_with_tmpl_filename(self, prompt_input: Union[str, list], tmpl_filename) -> str: - """ - same with `generate_prompt` - Args: - prompt_input: the input we want to feed in (IF THERE ARE MORE THAN ONE INPUT, THIS CAN BE A LIST.) - tmpl_filename: prompt template filename - Returns: - a str prompt that will be sent to LLM server. - """ - if isinstance(prompt_input, str): - prompt_input = [prompt_input] - prompt_input = [str(i) for i in prompt_input] - - f = open(str(self.prompt_dir.joinpath(tmpl_filename)), "r") - prompt = f.read() - f.close() - for count, i in enumerate(prompt_input): - prompt = prompt.replace(f"!!", i) - if "###" in prompt: - prompt = prompt.split("###")[1] - return prompt.strip() - - async def _aask(self, prompt: str) -> str: - return await self.llm.aask(prompt) - - async def _run_gpt35_max_tokens(self, prompt: str, max_tokens: int = 50, retry: int = 3): - for idx in range(retry): - try: - tmp_max_tokens_rsp = getattr(config.llm, "max_token", 1500) - setattr(config.llm, "max_token", max_tokens) - self.llm.use_system_prompt = False # to make it behave like a non-chat completions - - llm_resp = await self._aask(prompt) - - setattr(config.llm, "max_token", tmp_max_tokens_rsp) - logger.info(f"Action: {self.cls_name} llm _run_gpt35_max_tokens raw resp: {llm_resp}") - if self._func_validate(llm_resp, prompt): - return self._func_cleanup(llm_resp, prompt) - except Exception as exp: - logger.warning(f"Action: {self.cls_name} _run_gpt35_max_tokens exp: {exp}") - time.sleep(5) - return self.fail_default_resp - - async def _run_gpt35( - self, prompt: str, example_output: str, special_instruction: str, retry: int = 3 - ) -> Union[bool, Any]: - """same with `gpt_structure.ChatGPT_safe_generate_response`""" - prompt = '"""\n' + prompt + '\n"""\n' - prompt += f"Output the response to the prompt above in json. {special_instruction}\n" - prompt += "Example output json:\n" - prompt += '{"output": "' + str(example_output) + '"}' - - for idx in range(retry): - try: - llm_resp = await self._aask(prompt) - logger.info(f"Action: {self.cls_name} llm _run_gpt35 raw resp: {llm_resp}") - end_idx = llm_resp.strip().rfind("}") + 1 - llm_resp = llm_resp[:end_idx] - llm_resp = json.loads(llm_resp)["output"] - - if self._func_validate(llm_resp, prompt): - return self._func_cleanup(llm_resp, prompt) - except Exception as exp: - logger.warning(f"Action: {self.cls_name} _run_gpt35 exp: {exp}") - time.sleep(5) # usually avoid `Rate limit` - return False - - async def _run_gpt35_wo_extra_prompt(self, prompt: str, retry: int = 3) -> str: - for idx in range(retry): - try: - llm_resp = await self._aask(prompt) - llm_resp = llm_resp.strip() - logger.info(f"Action: {self.cls_name} llm _run_gpt35_wo_extra_prompt raw resp: {llm_resp}") - if self._func_validate(llm_resp, prompt): - return self._func_cleanup(llm_resp, prompt) - except Exception as exp: - logger.warning(f"Action: {self.cls_name} _run_gpt35_wo_extra_prompt exp: {exp}") - time.sleep(5) # usually avoid `Rate limit` - return self.fail_default_resp - - async def run(self, *args, **kwargs): - """Run action""" - raise NotImplementedError("The run method should be implemented in a subclass.") diff --git a/examples/stanford_town/actions/summarize_conv.py b/examples/stanford_town/actions/summarize_conv.py deleted file mode 100644 index f01fb36ad..000000000 --- a/examples/stanford_town/actions/summarize_conv.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : summarize the content of agents' conversation - -from examples.stanford_town.actions.st_action import STAction -from metagpt.logs import logger - - -class SummarizeConv(STAction): - name: str = "SummarizeConv" - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - resp = False - try: - _ = self._func_cleanup(llm_resp, prompt) - resp = True - except Exception: - pass - return resp - - def _func_cleanup(self, llm_resp: str, prompt: str) -> str: - ret = "conversing about " + llm_resp.strip() - return ret - - def _func_fail_default_resp(self) -> str: - return "conversing with a housemate about morning greetings" - - async def run(self, conv: list): - def create_prompt_input(conversation: list): - convo_str = "" - for row in conversation: - convo_str += f'{row[0]}: "{row[1]}"\n' - prompt_input = [convo_str] - return prompt_input - - prompt_input = create_prompt_input(conv) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "summarize_conversation_v1.txt") - - example_output = "conversing about what to eat for lunch" - special_instruction = ( - "The output must continue the sentence above by filling in the tag. " - "Don't start with 'this is a conversation about...' Just finish the sentence " - "but do not miss any important details (including who are chatting)." - ) - output = await self._run_gpt35(prompt, example_output, special_instruction) - logger.info(f"Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/task_decomp.py b/examples/stanford_town/actions/task_decomp.py deleted file mode 100644 index d9d5ec9fa..000000000 --- a/examples/stanford_town/actions/task_decomp.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : task_decomp - -import datetime - -from metagpt.logs import logger - -from ..actions.st_action import STAction - - -class TaskDecomp(STAction): - name: str = "TaskDecomp" - - def _func_cleanup(self, llm_resp: str, prompt: str) -> list: - # TODO SOMETHING HERE sometimes fails... See screenshot - temp = [i.strip() for i in llm_resp.split("\n")] - _cr = [] - cr = [] - for count, i in enumerate(temp): - if count != 0: - _cr += [" ".join([j.strip() for j in i.split(" ")][3:])] - else: - _cr += [i] - for count, i in enumerate(_cr): - k = [j.strip() for j in i.split("(duration in minutes:")] - task = k[0] - if task[-1] == ".": - task = task[:-1] - duration = int(k[1].split(",")[0].strip()) - cr += [[task, duration]] - - total_expected_min = int(prompt.split("(total duration in minutes")[-1].split("):")[0].strip()) - - # TODO -- now, you need to make sure that this is the same as the sum of - # the current action sequence. - curr_min_slot = [ - ["dummy", -1], - ] # (task_name, task_index) - for count, i in enumerate(cr): - i_task = i[0] - i_duration = i[1] - - i_duration -= i_duration % 5 - if i_duration > 0: - for j in range(i_duration): - curr_min_slot += [(i_task, count)] - curr_min_slot = curr_min_slot[1:] - - if len(curr_min_slot) > total_expected_min: - last_task = curr_min_slot[60] - for i in range(1, 6): - curr_min_slot[-1 * i] = last_task - elif len(curr_min_slot) < total_expected_min: - last_task = curr_min_slot[-1] - for i in range(total_expected_min - len(curr_min_slot)): - curr_min_slot += [last_task] - - cr_ret = [ - ["dummy", -1], - ] - for task, task_index in curr_min_slot: - if task != cr_ret[-1][0]: - cr_ret += [[task, 1]] - else: - cr_ret[-1][1] += 1 - cr = cr_ret[1:] - - return cr - - def _func_validate(self, llm_resp: str, prompt: str) -> bool: - # TODO -- this sometimes generates error - try: - self._func_cleanup(llm_resp, prompt) - except Exception: - return False - return True - - def _func_fail_default_resp(self) -> int: - fs = [["asleep", 0]] - return fs - - async def run(self, role: "STRole", task_desc: int, truncated_act_dur: int, *args, **kwargs): - def create_prompt_input(role, task, duration): - """ - Today is Saturday June 25. From 00:00 ~ 06:00am, Maeve is - planning on sleeping, 06:00 ~ 07:00am, Maeve is - planning on waking up and doing her morning routine, - and from 07:00am ~08:00am, Maeve is planning on having breakfast. - """ - - curr_f_org_index = role.scratch.get_f_daily_schedule_hourly_org_index() - all_indices = [] - # if curr_f_org_index > 0: - # all_indices += [curr_f_org_index-1] - all_indices += [curr_f_org_index] - if curr_f_org_index + 1 <= len(role.scratch.f_daily_schedule_hourly_org): - all_indices += [curr_f_org_index + 1] - if curr_f_org_index + 2 <= len(role.scratch.f_daily_schedule_hourly_org): - all_indices += [curr_f_org_index + 2] - - curr_time_range = "" - - print("DEBUG") - print(role.scratch.f_daily_schedule_hourly_org) - print(all_indices) - - summ_str = f'Today is {role.scratch.curr_time.strftime("%B %d, %Y")}. ' - summ_str += "From " - for index in all_indices: - print("index", index) - if index < len(role.scratch.f_daily_schedule_hourly_org): - start_min = 0 - for i in range(index): - start_min += role.scratch.f_daily_schedule_hourly_org[i][1] - end_min = start_min + role.scratch.f_daily_schedule_hourly_org[index][1] - start_time = datetime.datetime.strptime("00:00:00", "%H:%M:%S") + datetime.timedelta( - minutes=start_min - ) - end_time = datetime.datetime.strptime("00:00:00", "%H:%M:%S") + datetime.timedelta( - minutes=end_min - ) - start_time_str = start_time.strftime("%H:%M%p") - end_time_str = end_time.strftime("%H:%M%p") - summ_str += ( - f"{start_time_str} ~ {end_time_str}, {role.name} is planning " - f"on {role.scratch.f_daily_schedule_hourly_org[index][0]}, " - ) - if curr_f_org_index + 1 == index: - curr_time_range = f"{start_time_str} ~ {end_time_str}" - summ_str = summ_str[:-2] + "." - - prompt_input = [] - prompt_input += [role.scratch.get_str_iss()] - prompt_input += [summ_str] - # prompt_input += [role.scratch.get_str_curr_date_str()] - prompt_input += [role.scratch.get_str_firstname()] - prompt_input += [role.scratch.get_str_firstname()] - prompt_input += [task] - prompt_input += [curr_time_range] - prompt_input += [duration] - prompt_input += [role.scratch.get_str_firstname()] - return prompt_input - - prompt_input = create_prompt_input(role, task_desc, truncated_act_dur) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "task_decomp_v3.txt") - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=1000) - logger.info(f"Role: {role.name} {self.cls_name} output: {output}") - - fin_output = [] - time_sum = 0 - for i_task, i_duration in output: - time_sum += i_duration - # HM????????? - # if time_sum < duration: - if time_sum <= truncated_act_dur: - fin_output += [[i_task, i_duration]] - else: - break - ftime_sum = 0 - for fi_task, fi_duration in fin_output: - ftime_sum += fi_duration - - # print ("for debugging... line 365", fin_output) - fin_output[-1][1] += truncated_act_dur - ftime_sum - output = fin_output - - task_decomp = output - ret = [] - for decomp_task, duration in task_decomp: - ret += [[f"{task_desc} ({decomp_task})", duration]] - output = ret - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/actions/wake_up.py b/examples/stanford_town/actions/wake_up.py deleted file mode 100644 index d39115854..000000000 --- a/examples/stanford_town/actions/wake_up.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : wake_up - - -from metagpt.logs import logger - -from ..actions.st_action import STAction - - -class WakeUp(STAction): - name: str = "WakeUp" - - def _func_validate(self, llm_resp: str, prompt: str = None) -> bool: - try: - self._func_cleanup(llm_resp, prompt="") - except Exception: - return False - return True - - def _func_cleanup(self, llm_resp: str, prompt: str) -> int: - cr = int(llm_resp.strip().lower().split("am")[0]) - return cr - - def _func_fail_default_resp(self) -> int: - fs = 8 - return fs - - async def run(self, role: "STRole"): - def create_prompt_input(role): - prompt_input = [ - role.scratch.get_str_iss(), - role.scratch.get_str_lifestyle(), - role.scratch.get_str_firstname(), - ] - return prompt_input - - prompt_input = create_prompt_input(role) - prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "wake_up_hour_v1.txt") - self.fail_default_resp = self._func_fail_default_resp() - output = await self._run_gpt35_max_tokens(prompt, max_tokens=5) - logger.info(f"Role: {role.name} Action: {self.cls_name} output: {output}") - return output diff --git a/examples/stanford_town/memory/__init__.py b/examples/stanford_town/memory/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/examples/stanford_town/memory/agent_memory.py b/examples/stanford_town/memory/agent_memory.py deleted file mode 100644 index d212232f4..000000000 --- a/examples/stanford_town/memory/agent_memory.py +++ /dev/null @@ -1,378 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : BasicMemory,AgentMemory实现 - -from datetime import datetime -from pathlib import Path -from typing import Optional - -from pydantic import Field, field_serializer, model_validator - -from metagpt.logs import logger -from metagpt.memory.memory import Memory -from metagpt.schema import Message -from metagpt.utils.common import read_json_file, write_json_file - - -class BasicMemory(Message): - """ - BasicMemory继承于MG的Message类,其中content属性替代description属性 - Message类中对于Chat类型支持的非常好,对于Agent个体的Perceive,Reflection,Plan支持的并不多 - 在Type设计上,我们延续GA的三个种类,但是对于Chat种类的对话进行特别设计(具体怎么设计还没想好) - """ - - memory_id: Optional[str] = Field(default=None) # 记忆ID - memory_count: int = -1 # 第几个记忆,实际数值与Memory相等 - type_count: int = -1 # 第几种记忆,类型为整数 - memory_type: Optional[str] = Field(default=None) # 记忆类型,包含 event,thought,chat三种类型 - depth: int = -1 # 记忆深度,类型为整数 - created: Optional[datetime] = Field(default=None) # 创建时间 - expiration: Optional[datetime] = Field(default=None) # 记忆失效时间,默认为空() - last_accessed: Optional[datetime] = Field(default=None) # 上一次调用的时间,初始化时候与self.created一致 - subject: Optional[str] = Field(default=None) # 主语 - predicate: Optional[str] = Field(default=None) # 谓语 - object: Optional[str] = Field(default=None) # 宾语 - - description: Optional[str] = Field(default=None) - embedding_key: Optional[str] = Field(default=None) # 内容与self.content一致 - poignancy: int = -1 # importance值 - keywords: list[str] = Field(default=[]) # keywords - filling: list = Field(default=[]) # 装的与之相关联的memory_id的列表 - - __hash__ = object.__hash__ # support hash in AgentMemory - - @model_validator(mode="before") - @classmethod - def check_values(cls, values): - if "created" in values: - values["last_accessed"] = values["created"] - if "content" in values: - values["description"] = values["content"] - if "filling" in values: - values["filling"] = values["filling"] or [] - return values - - @field_serializer("created", "expiration") - def transform_time_field(self, time_field: Optional[datetime]) -> str: - if time_field: - time_field = time_field.strftime("%Y-%m-%d %H:%M:%S") - return time_field - - def summary(self): - return self.subject, self.predicate, self.object - - def save_to_dict(self) -> dict: - """ - 将MemoryBasic类转化为字典,用于存储json文件 - 这里需要注意,cause_by跟GA不兼容,所以需要做一个格式转换 - """ - memory_dict = dict() - node_id = self.memory_id - basic_mem_obj = self.model_dump( - include=[ - "node_count", - "type_count", - "type", - "depth", - "created", - "expiration", - "subject", - "predicate", - "object", - "description", - "embedding_key", - "poignancy", - "keywords", - "filling", - "cause_by", - ] - ) - - memory_dict[node_id] = basic_mem_obj - return memory_dict - - -class AgentMemory(Memory): - """ - GA中主要存储三种JSON - 1. embedding.json (Dict embedding_key:embedding) - 2. Node.json (Dict Node_id:Node) - 3. kw_strength.json - """ - - storage: list[BasicMemory] = [] # 重写Storage,存储BasicMemory所有节点 - event_list: list[BasicMemory] = [] # 存储event记忆 - thought_list: list[BasicMemory] = [] # 存储thought记忆 - chat_list: list[BasicMemory] = [] # chat-related memory - - event_keywords: dict[str, list[BasicMemory]] = dict() # 存储keywords - thought_keywords: dict[str, list[BasicMemory]] = dict() - chat_keywords: dict[str, list[BasicMemory]] = dict() - - kw_strength_event: dict[str, int] = dict() - kw_strength_thought: dict[str, int] = dict() - - memory_saved: Optional[Path] = Field(default=None) - embeddings: dict[str, list[float]] = dict() - - def set_mem_path(self, memory_saved: Path): - self.memory_saved = memory_saved - self.load(memory_saved) - - def save(self, memory_saved: Path): - """ - 将MemoryBasic类存储为Nodes.json形式。复现GA中的Kw Strength.json形式 - 这里添加一个路径即可 - TODO 这里在存储时候进行倒序存储,之后需要验证(test_memory通过) - """ - memory_json = dict() - for i in range(len(self.storage)): - memory_node = self.storage[len(self.storage) - i - 1] - memory_node = memory_node.save_to_dict() - memory_json.update(memory_node) - write_json_file(memory_saved.joinpath("nodes.json"), memory_json) - write_json_file(memory_saved.joinpath("embeddings.json"), self.embeddings) - - strength_json = dict() - strength_json["kw_strength_event"] = self.kw_strength_event - strength_json["kw_strength_thought"] = self.kw_strength_thought - write_json_file(memory_saved.joinpath("kw_strength.json"), strength_json) - - def load(self, memory_saved: Path): - """ - 将GA的JSON解析,填充到AgentMemory类之中 - """ - self.embeddings = read_json_file(memory_saved.joinpath("embeddings.json")) - memory_load = read_json_file(memory_saved.joinpath("nodes.json")) - for count in range(len(memory_load.keys())): - node_id = f"node_{str(count + 1)}" - node_details = memory_load[node_id] - node_type = node_details["type"] - created = datetime.strptime(node_details["created"], "%Y-%m-%d %H:%M:%S") - expiration = None - if node_details["expiration"]: - expiration = datetime.strptime(node_details["expiration"], "%Y-%m-%d %H:%M:%S") - - s = node_details["subject"] - p = node_details["predicate"] - o = node_details["object"] - - description = node_details["description"] - embedding_pair = (node_details["embedding_key"], self.embeddings[node_details["embedding_key"]]) - poignancy = node_details["poignancy"] - keywords = set(node_details["keywords"]) - filling = node_details["filling"] - if node_type == "thought": - self.add_thought( - created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling - ) - if node_type == "event": - self.add_event(created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling) - if node_type == "chat": - self.add_chat(created, expiration, s, p, o, description, keywords, poignancy, embedding_pair, filling) - - strength_keywords_load = read_json_file(memory_saved.joinpath("kw_strength.json")) - if strength_keywords_load["kw_strength_event"]: - self.kw_strength_event = strength_keywords_load["kw_strength_event"] - if strength_keywords_load["kw_strength_thought"]: - self.kw_strength_thought = strength_keywords_load["kw_strength_thought"] - - def add(self, memory_basic: BasicMemory): - """ - Add a new message to storage, while updating the index - 重写add方法,修改原有的Message类为BasicMemory类,并添加不同的记忆类型添加方式 - """ - if memory_basic.memory_id in self.storage: - return - self.storage.append(memory_basic) - if memory_basic.memory_type == "chat": - self.chat_list[0:0] = [memory_basic] - return - if memory_basic.memory_type == "thought": - self.thought_list[0:0] = [memory_basic] - return - if memory_basic.memory_type == "event": - self.event_list[0:0] = [memory_basic] - return - - def add_chat( - self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling, cause_by="" - ): - """ - 调用add方法,初始化chat,在创建的时候就需要调用embedding函数 - """ - memory_count = len(self.storage) + 1 - type_count = len(self.thought_list) + 1 - memory_type = "chat" - memory_id = f"node_{str(memory_count)}" - depth = 1 - - memory_node = BasicMemory( - memory_id=memory_id, - memory_count=memory_count, - type_count=type_count, - memory_type=memory_type, - depth=depth, - created=created, - expiration=expiration, - subject=s, - predicate=p, - object=o, - description=content, - embedding_key=embedding_pair[0], - poignancy=poignancy, - keywords=keywords, - filling=filling, - cause_by=cause_by, - ) - - keywords = [i.lower() for i in keywords] - for kw in keywords: - if kw in self.chat_keywords: - self.chat_keywords[kw][0:0] = [memory_node] - else: - self.chat_keywords[kw] = [memory_node] - - self.add(memory_node) - - self.embeddings[embedding_pair[0]] = embedding_pair[1] - return memory_node - - def add_thought(self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling): - """ - 调用add方法,初始化thought - """ - memory_count = len(self.storage) + 1 - type_count = len(self.thought_list) + 1 - memory_type = "thought" - memory_id = f"node_{str(memory_count)}" - depth = 1 - - try: - if filling: - depth_list = [memory_node.depth for memory_node in self.storage if memory_node.memory_id in filling] - depth += max(depth_list) - except Exception as exp: - logger.warning(f"filling init occur {exp}") - pass - - memory_node = BasicMemory( - memory_id=memory_id, - memory_count=memory_count, - type_count=type_count, - memory_type=memory_type, - depth=depth, - created=created, - expiration=expiration, - subject=s, - predicate=p, - object=o, - description=content, - embedding_key=embedding_pair[0], - poignancy=poignancy, - keywords=keywords, - filling=filling, - ) - - keywords = [i.lower() for i in keywords] - for kw in keywords: - if kw in self.thought_keywords: - self.thought_keywords[kw][0:0] = [memory_node] - else: - self.thought_keywords[kw] = [memory_node] - - self.add(memory_node) - - if f"{p} {o}" != "is idle": - for kw in keywords: - if kw in self.kw_strength_thought: - self.kw_strength_thought[kw] += 1 - else: - self.kw_strength_thought[kw] = 1 - - self.embeddings[embedding_pair[0]] = embedding_pair[1] - return memory_node - - def add_event(self, created, expiration, s, p, o, content, keywords, poignancy, embedding_pair, filling): - """ - 调用add方法,初始化event - """ - memory_count = len(self.storage) + 1 - type_count = len(self.event_list) + 1 - memory_type = "event" - memory_id = f"node_{str(memory_count)}" - depth = 0 - - if "(" in content: - content = " ".join(content.split()[:3]) + " " + content.split("(")[-1][:-1] - - memory_node = BasicMemory( - memory_id=memory_id, - memory_count=memory_count, - type_count=type_count, - memory_type=memory_type, - depth=depth, - created=created, - expiration=expiration, - subject=s, - predicate=p, - object=o, - description=content, - embedding_key=embedding_pair[0], - poignancy=poignancy, - keywords=keywords, - filling=filling, - ) - - keywords = [i.lower() for i in keywords] - for kw in keywords: - if kw in self.event_keywords: - self.event_keywords[kw][0:0] = [memory_node] - else: - self.event_keywords[kw] = [memory_node] - - self.add(memory_node) - - if f"{p} {o}" != "is idle": - for kw in keywords: - if kw in self.kw_strength_event: - self.kw_strength_event[kw] += 1 - else: - self.kw_strength_event[kw] = 1 - - self.embeddings[embedding_pair[0]] = embedding_pair[1] - return memory_node - - def get_summarized_latest_events(self, retention): - ret_set = set() - for e_node in self.event_list[:retention]: - ret_set.add(e_node.summary()) - return ret_set - - def get_last_chat(self, target_role_name: str): - if target_role_name.lower() in self.chat_keywords: - return self.chat_keywords[target_role_name.lower()][0] - else: - return False - - def retrieve_relevant_thoughts(self, s_content: str, p_content: str, o_content: str) -> set: - contents = [s_content, p_content, o_content] - - ret = [] - for i in contents: - if i in self.thought_keywords: - ret += self.thought_keywords[i.lower()] - - ret = set(ret) - return ret - - def retrieve_relevant_events(self, s_content: str, p_content: str, o_content: str) -> set: - contents = [s_content, p_content, o_content] - - ret = [] - for i in contents: - if i in self.event_keywords: - ret += self.event_keywords[i] - - ret = set(ret) - return ret diff --git a/examples/stanford_town/memory/retrieve.py b/examples/stanford_town/memory/retrieve.py deleted file mode 100644 index db3a87bea..000000000 --- a/examples/stanford_town/memory/retrieve.py +++ /dev/null @@ -1,180 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : Retrieve函数实现 - -import datetime - -from numpy import dot -from numpy.linalg import norm - -from examples.stanford_town.memory.agent_memory import BasicMemory -from examples.stanford_town.utils.utils import get_embedding - - -def agent_retrieve( - agent_memory, - curr_time: datetime.datetime, - memory_forget: float, - query: str, - nodes: list[BasicMemory], - topk: int = 4, -) -> list[BasicMemory]: - """ - Retrieve需要集合Role使用,原因在于Role才具有AgentMemory,scratch - 逻辑:Role调用该函数,self.rc.AgentMemory,self.rc.scratch.curr_time,self.rc.scratch.memory_forget - 输入希望查询的内容与希望回顾的条数,返回TopK条高分记忆,即List[BasicMemory] - - Score_lists示例 - { - "memory": memories[i], BasicMemory类 - "importance": memories[i].poignancy - "recency": 衰减因子计算结果 - "relevance": 搜索结果 - } - """ - memories = nodes - agent_memory_embedding = agent_memory.embeddings - memories = sorted(memories, key=lambda memory_node: memory_node.last_accessed, reverse=True) - - score_list = [] - score_list = extract_importance(memories, score_list) - score_list = extract_recency(curr_time, memory_forget, score_list) - score_list = extract_relevance(agent_memory_embedding, query, score_list) - score_list = normalize_score_floats(score_list, 0, 1) - - total_dict = {} - gw = [1, 1, 1] # 三个因素的权重,重要性,近因性,相关性, - for i in range(len(score_list)): - total_score = ( - score_list[i]["importance"] * gw[0] + score_list[i]["recency"] * gw[1] + score_list[i]["relevance"] * gw[2] - ) - total_dict[score_list[i]["memory"].memory_id] = total_score - - result = top_highest_x_values(total_dict, topk) - - return result # 返回的是一个BasicMemory列表 - - -def new_agent_retrieve(role, focus_points: list, n_count=30) -> dict: - """ - 输入为role,关注点列表,返回记忆数量 - 输出为字典,键为focus_point,值为对应的记忆列表 - """ - retrieved = dict() - for focal_pt in focus_points: - nodes = [ - [i.last_accessed, i] - for i in role.memory.event_list + role.memory.thought_list - if "idle" not in i.embedding_key - ] - nodes = sorted(nodes, key=lambda x: x[0]) - nodes = [i for created, i in nodes] - results = agent_retrieve( - role.memory, role.scratch.curr_time, role.scratch.recency_decay, focal_pt, nodes, n_count - ) - final_result = [] - for n in results: - for i in role.memory.storage: - if i.memory_id == n: - i.last_accessed = role.scratch.curr_time - final_result.append(i) - - retrieved[focal_pt] = final_result - - return retrieved - - -def top_highest_x_values(d, x): - """ - 输入字典,Topx - 返回以字典值排序,字典键组成的List[BasicMemory] - """ - top_v = [item[0] for item in sorted(d.items(), key=lambda item: item[1], reverse=True)[:x]] - return top_v - - -def extract_importance(memories, score_list): - """ - 抽取重要性 - """ - for i in range(len(memories)): - score = {"memory": memories[i], "importance": memories[i].poignancy} - score_list.append(score) - return score_list - - -def extract_relevance(agent_memory_embedding, query, score_list): - """ - 抽取相关性 - """ - query_embedding = get_embedding(query) - # 进行 - for i in range(len(score_list)): - node_embedding = agent_memory_embedding[score_list[i]["memory"].embedding_key] - result = cos_sim(node_embedding, query_embedding) - score_list[i]["relevance"] = result - - return score_list - - -def extract_recency(curr_time, memory_forget, score_list): - """ - 抽取近因性,目前使用的现实世界过一天走一个衰减因子 - """ - for i in range(len(score_list)): - day_count = (curr_time - score_list[i]["memory"].created).days - score_list[i]["recency"] = memory_forget**day_count - return score_list - - -def cos_sim(a, b): - """ - 计算余弦相似度 - """ - return dot(a, b) / (norm(a) * norm(b)) - - -def normalize_list_floats(single_list, target_min, target_max): - """ - 单个列表归一化 - """ - if len(single_list) == 0: - return [] - - min_val = min(single_list) - max_val = max(single_list) - range_val = max_val - min_val - - if range_val == 0: - for i in range(len(single_list)): - single_list[i] = (target_max - target_min) / 2 - else: - for i in range(len(single_list)): - single_list[i] = (single_list[i] - min_val) * (target_max - target_min) / range_val + target_min - return single_list - - -def normalize_score_floats(score_list, target_min, target_max): - """ - 整体归一化 - """ - importance_list = [] - relevance_list = [] - recency_list = [] - - for i in range(len(score_list)): - importance_list.append(score_list[i]["importance"]) - relevance_list.append(score_list[i]["relevance"]) - recency_list.append(score_list[i]["recency"]) - - # 进行归一化操作 - importance_list = normalize_list_floats(importance_list, target_min, target_max) - relevance_list = normalize_list_floats(relevance_list, target_min, target_max) - recency_list = normalize_list_floats(recency_list, target_min, target_max) - - for i in range(len(score_list)): - score_list[i]["importance"] = importance_list[i] - score_list[i]["relevance"] = relevance_list[i] - score_list[i]["recency"] = recency_list[i] - - return score_list diff --git a/examples/stanford_town/memory/scratch.py b/examples/stanford_town/memory/scratch.py deleted file mode 100644 index b4036f839..000000000 --- a/examples/stanford_town/memory/scratch.py +++ /dev/null @@ -1,383 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : Scratch类实现(角色信息类) - -from datetime import datetime, timedelta -from pathlib import Path -from typing import Optional, Union - -from pydantic import BaseModel, Field, field_serializer, field_validator - -from metagpt.utils.common import read_json_file, write_json_file - - -class Scratch(BaseModel): - # 类别1:人物超参 - vision_r: int = 4 - att_bandwidth: int = 3 - retention: int = 5 - - # 类别2:世界信息 - curr_time: Optional[datetime] = Field(default=None) - curr_tile: Optional[list[int]] = Field(default=None) - daily_plan_req: Optional[str] = Field(default=None) - - # 类别3:人物角色的核心身份 - name: Optional[str] = Field(default=None) - first_name: Optional[str] = Field(default=None) - last_name: Optional[str] = Field(default=None) - age: Optional[int] = Field(default=None) - innate: Optional[str] = Field(default=None) # L0 permanent core traits. - learned: Optional[str] = Field(default=None) # L1 stable traits. - currently: Optional[str] = Field(default=None) # L2 external implementation. - lifestyle: Optional[str] = Field(default=None) - living_area: Optional[str] = Field(default=None) - - # 类别4:旧反思变量 - concept_forget: int = 100 - daily_reflection_time: int = 60 * 3 - daily_reflection_size: int = 5 - overlap_reflect_th: int = 2 - kw_strg_event_reflect_th: int = 4 - kw_strg_thought_reflect_th: int = 4 - - # 类别5:新反思变量 - recency_w: int = 1 - relevance_w: int = 1 - importance_w: int = 1 - recency_decay: float = 0.99 - importance_trigger_max: int = 150 - importance_trigger_curr: int = 150 - importance_ele_n: int = 0 - thought_count: int = 5 - - # 类别6:个人计划 - daily_req: list[str] = Field(default=[]) - f_daily_schedule: list[list[Union[int, str]]] = Field(default=[]) - f_daily_schedule_hourly_org: list[list[Union[int, str]]] = Field(default=[]) - - # 类别7:当前动作 - act_address: Optional[str] = Field(default=None) - act_start_time: Optional[datetime] = Field(default=None) - act_duration: Optional[int] = Field(default=None) - act_description: Optional[str] = Field(default=None) - act_pronunciatio: Optional[str] = Field(default=None) - act_event: list[Optional[str]] = [None, None, None] - - act_obj_description: Optional[str] = Field(default=None) - act_obj_pronunciatio: Optional[str] = Field(default=None) - act_obj_event: list[Optional[str]] = [None, None, None] - - chatting_with: Optional[str] = Field(default=None) - chat: Optional[str] = Field(default=None) - chatting_with_buffer: dict = dict() - chatting_end_time: Optional[datetime] = Field(default=None) - - act_path_set: bool = False - planned_path: list[list[int]] = Field(default=[]) - - @field_validator("curr_time", "act_start_time", "chatting_end_time", mode="before") - @classmethod - def check_time_filed(cls, time_filed): - val = datetime.strptime(time_filed, "%B %d, %Y, %H:%M:%S") if time_filed else None - return val - - @field_serializer("curr_time", "act_start_time", "chatting_end_time") - def transform_time_field(self, time_filed: Optional[datetime]) -> str: - if time_filed: - time_filed = time_filed.strftime("%B %d, %Y, %H:%M:%S") - return time_filed - - @classmethod - def init_scratch_from_path(cls, f_saved: Path): - scratch_load = read_json_file(f_saved) - scratch = Scratch(**scratch_load) - return scratch - - def save(self, out_json: Path): - """ - Save persona's scratch. - - INPUT: - out_json: The file where we wil be saving our persona's state. - OUTPUT: - None - """ - scratch = self.model_dump() - write_json_file(out_json, scratch, encoding="utf-8") - - def get_f_daily_schedule_index(self, advance=0): - """ - We get the current index of self.f_daily_schedule. - - Recall that self.f_daily_schedule stores the decomposed action sequences - up until now, and the hourly sequences of the future action for the rest - of today. Given that self.f_daily_schedule is a list of list where the - inner list is composed of [task, duration], we continue to add up the - duration until we reach "if elapsed > today_min_elapsed" condition. The - index where we stop is the index we will return. - - INPUT - advance: Integer value of the number minutes we want to look into the - future. This allows us to get the index of a future timeframe. - OUTPUT - an integer value for the current index of f_daily_schedule. - """ - # We first calculate teh number of minutes elapsed today. - today_min_elapsed = 0 - today_min_elapsed += self.curr_time.hour * 60 - today_min_elapsed += self.curr_time.minute - today_min_elapsed += advance - - x = 0 - for task, duration in self.f_daily_schedule: - x += duration - x = 0 - for task, duration in self.f_daily_schedule_hourly_org: - x += duration - - # We then calculate the current index based on that. - curr_index = 0 - elapsed = 0 - for task, duration in self.f_daily_schedule: - elapsed += duration - if elapsed > today_min_elapsed: - return curr_index - curr_index += 1 - - return curr_index - - def get_f_daily_schedule_hourly_org_index(self, advance=0): - """ - We get the current index of self.f_daily_schedule_hourly_org. - It is otherwise the same as get_f_daily_schedule_index. - - INPUT - advance: Integer value of the number minutes we want to look into the - future. This allows us to get the index of a future timeframe. - OUTPUT - an integer value for the current index of f_daily_schedule. - """ - # We first calculate teh number of minutes elapsed today. - today_min_elapsed = 0 - today_min_elapsed += self.curr_time.hour * 60 - today_min_elapsed += self.curr_time.minute - today_min_elapsed += advance - # We then calculate the current index based on that. - curr_index = 0 - elapsed = 0 - for task, duration in self.f_daily_schedule_hourly_org: - elapsed += duration - if elapsed > today_min_elapsed: - return curr_index - curr_index += 1 - return curr_index - - def get_str_iss(self): - """ - ISS stands for "identity stable set." This describes the commonset summary - of this persona -- basically, the bare minimum description of the persona - that gets used in almost all prompts that need to call on the persona. - - INPUT - None - OUTPUT - the identity stable set summary of the persona in a string form. - EXAMPLE STR OUTPUT - "Name: Dolores Heitmiller - Age: 28 - Innate traits: hard-edged, independent, loyal - Learned traits: Dolores is a painter who wants live quietly and paint - while enjoying her everyday life. - Currently: Dolores is preparing for her first solo show. She mostly - works from home. - Lifestyle: Dolores goes to bed around 11pm, sleeps for 7 hours, eats - dinner around 6pm. - Daily plan requirement: Dolores is planning to stay at home all day and - never go out." - """ - commonset = "" - commonset += f"Name: {self.name}\n" - commonset += f"Age: {self.age}\n" - commonset += f"Innate traits: {self.innate}\n" - commonset += f"Learned traits: {self.learned}\n" - commonset += f"Currently: {self.currently}\n" - commonset += f"Lifestyle: {self.lifestyle}\n" - commonset += f"Daily plan requirement: {self.daily_plan_req}\n" - commonset += f"Current Date: {self.curr_time.strftime('%A %B %d') if self.curr_time else ''}\n" - return commonset - - def get_str_name(self): - return self.name - - def get_str_firstname(self): - return self.first_name - - def get_str_lastname(self): - return self.last_name - - def get_str_age(self): - return str(self.age) - - def get_str_innate(self): - return self.innate - - def get_str_learned(self): - return self.learned - - def get_str_currently(self): - return self.currently - - def get_str_lifestyle(self): - return self.lifestyle - - def get_str_daily_plan_req(self): - return self.daily_plan_req - - def get_str_curr_date_str(self): - return self.curr_time.strftime("%A %B %d") - - def get_curr_event(self): - if not self.act_address: - return self.name, None, None - else: - return self.act_event - - def get_curr_event_and_desc(self): - if not self.act_address: - return self.name, None, None, None - else: - return self.act_event[0], self.act_event[1], self.act_event[2], self.act_description - - def get_curr_obj_event_and_desc(self): - if not self.act_address: - return "", None, None, None - else: - return self.act_address, self.act_obj_event[1], self.act_obj_event[2], self.act_obj_description - - def add_new_action( - self, - action_address, - action_duration, - action_description, - action_pronunciatio, - action_event, - chatting_with, - chat, - chatting_with_buffer, - chatting_end_time, - act_obj_description, - act_obj_pronunciatio, - act_obj_event, - act_start_time=None, - ): - self.act_address = action_address - self.act_duration = action_duration - self.act_description = action_description - self.act_pronunciatio = action_pronunciatio - self.act_event = action_event - - self.chatting_with = chatting_with - self.chat = chat - if chatting_with_buffer: - self.chatting_with_buffer.update(chatting_with_buffer) - self.chatting_end_time = chatting_end_time - - self.act_obj_description = act_obj_description - self.act_obj_pronunciatio = act_obj_pronunciatio - self.act_obj_event = act_obj_event - - self.act_start_time = self.curr_time - - self.act_path_set = False - - def act_time_str(self): - """ - Returns a string output of the current time. - - INPUT - None - OUTPUT - A string output of the current time. - EXAMPLE STR OUTPUT - "14:05 P.M." - """ - return self.act_start_time.strftime("%H:%M %p") - - def act_check_finished(self): - """ - Checks whether the self.Action instance has finished. - - INPUT - curr_datetime: Current time. If current time is later than the action's - start time + its duration, then the action has finished. - OUTPUT - Boolean [True]: Action has finished. - Boolean [False]: Action has not finished and is still ongoing. - """ - if not self.act_address: - return True - - if self.chatting_with: - end_time = self.chatting_end_time - else: - x = self.act_start_time - if x.second != 0: - x = x.replace(second=0) - x = x + timedelta(minutes=1) - end_time = x + timedelta(minutes=self.act_duration) - - if end_time.strftime("%H:%M:%S") == self.curr_time.strftime("%H:%M:%S"): - return True - return False - - def act_summarize(self): - """ - Summarize the current action as a dictionary. - - INPUT - None - OUTPUT - ret: A human readable summary of the action. - """ - exp = dict() - exp["persona"] = self.name - exp["address"] = self.act_address - exp["start_datetime"] = self.act_start_time - exp["duration"] = self.act_duration - exp["description"] = self.act_description - exp["pronunciatio"] = self.act_pronunciatio - return exp - - def act_summary_str(self): - """ - Returns a string summary of the current action. Meant to be - human-readable. - - INPUT - None - OUTPUT - ret: A human readable summary of the action. - """ - start_datetime_str = self.act_start_time.strftime("%A %B %d -- %H:%M %p") - ret = f"[{start_datetime_str}]\n" - ret += f"Activity: {self.name} is {self.act_description}\n" - ret += f"Address: {self.act_address}\n" - ret += f"Duration in minutes (e.g., x min): {str(self.act_duration)} min\n" - return ret - - def get_daily_schedule(self, daily_schedule: list[list[str]]): - ret = "" - curr_min_sum = 0 - for row in daily_schedule: - curr_min_sum += row[1] - hour = int(curr_min_sum / 60) - minute = curr_min_sum % 60 - ret += f"{hour:02}:{minute:02} || {row[0]}\n" - return ret - - def get_str_daily_schedule_summary(self): - return self.get_daily_schedule(self.f_daily_schedule) - - def get_str_daily_schedule_hourly_org_summary(self): - return self.get_daily_schedule(self.f_daily_schedule_hourly_org) diff --git a/examples/stanford_town/memory/spatial_memory.py b/examples/stanford_town/memory/spatial_memory.py deleted file mode 100644 index c1030cdbb..000000000 --- a/examples/stanford_town/memory/spatial_memory.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Author: Joon Sung Park (joonspk@stanford.edu) - -File: spatial_memory.py -Description: Defines the MemoryTree class that serves as the agents' spatial -memory that aids in grounding their behavior in the game world. -""" -from pathlib import Path - -from pydantic import BaseModel, Field - -from metagpt.utils.common import read_json_file, write_json_file - - -class MemoryTree(BaseModel): - tree: dict = Field(default=dict) - - def set_mem_path(self, f_saved: Path): - self.tree = read_json_file(f_saved) - - def print_tree(self) -> None: - def _print_tree(tree, depth): - dash = " >" * depth - if isinstance(tree, list): - if tree: - print(dash, tree) - return - - for key, val in tree.items(): - if key: - print(dash, key) - _print_tree(val, depth + 1) - - _print_tree(self.tree, 0) - - def save(self, out_json: Path) -> None: - write_json_file(out_json, self.tree) - - def get_str_accessible_sectors(self, curr_world: str) -> str: - """ - Returns a summary string of all the arenas that the persona can access - within the current sector. - - Note that there are places a given persona cannot enter. This information - is provided in the persona sheet. We account for this in this function. - - INPUT - None - OUTPUT - A summary string of all the arenas that the persona can access. - EXAMPLE STR OUTPUT - "bedroom, kitchen, dining room, office, bathroom" - """ - x = ", ".join(list(self.tree[curr_world].keys())) - return x - - def get_str_accessible_sector_arenas(self, sector: str) -> str: - """ - Returns a summary string of all the arenas that the persona can access - within the current sector. - - Note that there are places a given persona cannot enter. This information - is provided in the persona sheet. We account for this in this function. - - INPUT - None - OUTPUT - A summary string of all the arenas that the persona can access. - EXAMPLE STR OUTPUT - "bedroom, kitchen, dining room, office, bathroom" - """ - curr_world, curr_sector = sector.split(":") - if not curr_sector: - return "" - x = ", ".join(list(self.tree[curr_world][curr_sector].keys())) - return x - - def get_str_accessible_arena_game_objects(self, arena: str) -> str: - """ - Get a str list of all accessible game objects that are in the arena. If - temp_address is specified, we return the objects that are available in - that arena, and if not, we return the objects that are in the arena our - persona is currently in. - - INPUT - temp_address: optional arena address - OUTPUT - str list of all accessible game objects in the gmae arena. - EXAMPLE STR OUTPUT - "phone, charger, bed, nightstand" - """ - curr_world, curr_sector, curr_arena = arena.split(":") - - if not curr_arena: - return "" - - try: - x = ", ".join(list(self.tree[curr_world][curr_sector][curr_arena])) - except Exception: - x = ", ".join(list(self.tree[curr_world][curr_sector][curr_arena.lower()])) - return x - - def add_tile_info(self, tile_info: dict) -> None: - if tile_info["world"]: - if tile_info["world"] not in self.tree: - self.tree[tile_info["world"]] = {} - if tile_info["sector"]: - if tile_info["sector"] not in self.tree[tile_info["world"]]: - self.tree[tile_info["world"]][tile_info["sector"]] = {} - if tile_info["arena"]: - if tile_info["arena"] not in self.tree[tile_info["world"]][tile_info["sector"]]: - self.tree[tile_info["world"]][tile_info["sector"]][tile_info["arena"]] = [] - if tile_info["game_object"]: - if tile_info["game_object"] not in self.tree[tile_info["world"]][tile_info["sector"]][tile_info["arena"]]: - self.tree[tile_info["world"]][tile_info["sector"]][tile_info["arena"]] += [tile_info["game_object"]] diff --git a/examples/stanford_town/plan/__init__.py b/examples/stanford_town/plan/__init__.py deleted file mode 100644 index 2bcf8efd0..000000000 --- a/examples/stanford_town/plan/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : diff --git a/examples/stanford_town/plan/converse.py b/examples/stanford_town/plan/converse.py deleted file mode 100644 index b0ff54b68..000000000 --- a/examples/stanford_town/plan/converse.py +++ /dev/null @@ -1,93 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : conversation between two agents - -from typing import Tuple - -from examples.stanford_town.actions.agent_chat_sum_rel import AgentChatSumRel -from examples.stanford_town.actions.gen_iter_chat_utt import GenIterChatUTT -from examples.stanford_town.memory.retrieve import new_agent_retrieve -from metagpt.logs import logger - - -async def agent_conversation(init_role: "STRole", target_role: "STRole", conv_rounds: int = 8) -> list[list[str]]: - curr_chat = [] - logger.info(f"Role: {init_role.name} starts a conversation with Role: {target_role.name}") - - for idx in range(conv_rounds): - logger.info(f"Conv round: {idx} between {init_role.name} and {target_role.name}") - scratch = init_role.rc.scratch - target_scratch = target_role.rc.scratch - - focal_points = [f"{target_scratch.name}"] - retrieved = new_agent_retrieve(init_role, focal_points, 50) - relationship = await generate_summarize_agent_relationship(init_role, target_role, retrieved) - logger.info(f"The relationship between {init_role.name} and {target_role.name}: {relationship}") - last_chat = "" - for i in curr_chat[-4:]: - last_chat += ": ".join(i) + "\n" - if last_chat: - focal_points = [f"{relationship}", f"{target_scratch.name} is {target_scratch.act_description}", last_chat] - else: - focal_points = [f"{relationship}", f"{target_scratch.name} is {target_scratch.act_description}"] - retrieved = new_agent_retrieve(init_role, focal_points, 15) - utt, end = await generate_one_utterance(init_role, target_role, retrieved, curr_chat) - - curr_chat += [[scratch.name, utt]] - if end: - break - - focal_points = [f"{scratch.name}"] - retrieved = new_agent_retrieve(target_role, focal_points, 50) - relationship = await generate_summarize_agent_relationship(target_role, init_role, retrieved) - logger.info(f"The relationship between {target_role.name} and {init_role.name}: {relationship}") - last_chat = "" - for i in curr_chat[-4:]: - last_chat += ": ".join(i) + "\n" - if last_chat: - focal_points = [f"{relationship}", f"{scratch.name} is {scratch.act_description}", last_chat] - else: - focal_points = [f"{relationship}", f"{scratch.name} is {scratch.act_description}"] - retrieved = new_agent_retrieve(target_role, focal_points, 15) - utt, end = await generate_one_utterance(target_role, init_role, retrieved, curr_chat) - - curr_chat += [[target_scratch.name, utt]] - if end: - break - - logger.warning(f"Conversations between {target_role.name} and {init_role.name}:") - for row in curr_chat: - logger.info(row) - - return curr_chat - - -async def generate_summarize_agent_relationship(init_role: "STRole", target_role: "STRole", retrieved: dict) -> str: - all_embedding_keys = list() - for key, val in retrieved.items(): - for i in val: - all_embedding_keys += [i.embedding_key] - all_embedding_key_str = "" - for i in all_embedding_keys: - all_embedding_key_str += f"{i}\n" - - summarized_relationship = await AgentChatSumRel().run(init_role, target_role, all_embedding_key_str) - return summarized_relationship - - -async def generate_one_utterance(init_role, target_role, retrieved: dict, curr_chat: list) -> Tuple[str, str]: - # Chat version optimized for speed via batch generation - scratch = init_role.rc.scratch - target_scratch = target_role.rc.scratch - curr_context = ( - f"{scratch.name} " - + f"was {scratch.act_description} " - + f"when {scratch.name} " - + f"saw {target_scratch.name} " - + f"in the middle of {target_scratch.act_description}.\n" - ) - curr_context += f"{scratch.name} " + "is initiating a conversation with " + f"{target_scratch.name}." - - x = await GenIterChatUTT().run(init_role, target_role, retrieved, curr_context, curr_chat) - - return x["utterance"], x["end"] diff --git a/examples/stanford_town/plan/st_plan.py b/examples/stanford_town/plan/st_plan.py deleted file mode 100644 index 080d87de1..000000000 --- a/examples/stanford_town/plan/st_plan.py +++ /dev/null @@ -1,721 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : st' planning execution - -import datetime -import math -import random -from typing import Tuple, Union - -from metagpt.llm import LLM -from metagpt.logs import logger - -from ..actions.decide_to_talk import DecideToTalk -from ..actions.gen_action_details import GenActionDetails -from ..actions.gen_daily_schedule import GenDailySchedule -from ..actions.gen_hourly_schedule import GenHourlySchedule -from ..actions.new_decomp_schedule import NewDecompSchedule -from ..actions.summarize_conv import SummarizeConv -from ..actions.task_decomp import TaskDecomp -from ..actions.wake_up import WakeUp -from ..memory.retrieve import new_agent_retrieve -from ..plan.converse import agent_conversation -from ..utils.utils import get_embedding - - -async def plan(role: "STRole", roles: dict["STRole"], new_day: bool, retrieved: dict) -> str: - # PART 1: Generate the hourly schedule. - if new_day: - await _long_term_planning(role, new_day) - - # PART 2: If the current action has expired, we want to create a new plan. - act_check_finished = role.scratch.act_check_finished() - logger.info(f"Role: {role.name} act_check_finished is {act_check_finished}") - if act_check_finished: - await _determine_action(role) - - # PART 3: If you perceived an event that needs to be responded to (saw - # another role), and retrieved relevant information. - # Step 1: Retrieved may have multiple events represented in it. The first - # job here is to determine which of the events we want to focus - # on for the role. - # takes the form of a dictionary like this: - # dictionary {["curr_event"] = , - # ["events"] = [, ...], - # ["thoughts"] = [, ...]} - focused_event = False - if retrieved.keys(): - focused_event = _choose_retrieved(role.name, retrieved) - - # Step 2: Once we choose an event, we need to determine whether the - # role will take any actions for the perceived event. There are - # three possible modes of reaction returned by _should_react. - # a) "chat with {target_role.name}" - # b) "react" - # c) False - logger.info(f"Role: {role.name} focused_event: {focused_event}") - if focused_event: - reaction_mode = await _should_react(role, focused_event, roles) - logger.info(f"Role: {role.name} reaction_mode: {reaction_mode}") - if reaction_mode: - # If we do want to chat, then we generate conversation - if reaction_mode[:9] == "chat with": - await _chat_react(role, reaction_mode, roles) - elif reaction_mode[:4] == "wait": - await _wait_react(role, reaction_mode) - - # Step 3: Chat-related state clean up. - # If the persona is not chatting with anyone, we clean up any of the - # chat-related states here. - if role.rc.scratch.act_event[1] != "chat with": - role.rc.scratch.chatting_with = None - role.rc.scratch.chat = None - role.rc.scratch.chatting_end_time = None - # We want to make sure that the persona does not keep conversing with each - # other in an infinite loop. So, chatting_with_buffer maintains a form of - # buffer that makes the persona wait from talking to the same target - # immediately after chatting once. We keep track of the buffer value here. - curr_persona_chat_buffer = role.rc.scratch.chatting_with_buffer - for persona_name, buffer_count in curr_persona_chat_buffer.items(): - if persona_name != role.rc.scratch.chatting_with: - role.rc.scratch.chatting_with_buffer[persona_name] -= 1 - - return role.rc.scratch.act_address - - -def _choose_retrieved(role_name: str, retrieved: dict) -> Union[None, dict]: - """ - Retrieved elements have multiple core "curr_events". We need to choose one - event to which we are going to react to. We pick that event here. - Args: - role_name: Current role instance's name whose action we are determining. - retrieved: A dictionary of that were retrieved from the - the role's associative memory. This dictionary takes the - following form: - dictionary[event.description] = - {["curr_event"] = , - ["events"] = [, ...], - ["thoughts"] = [, ...] } - """ - # Once we are done with the reflection, we might want to build a more - # complex structure here. - - # We do not want to take self events... for now - copy_retrieved = retrieved.copy() - for event_desc, rel_ctx in copy_retrieved.items(): - curr_event = rel_ctx["curr_event"] - if curr_event.subject == role_name: - del retrieved[event_desc] - - # Always choose role first. - priority = [] - for event_desc, rel_ctx in retrieved.items(): - curr_event = rel_ctx["curr_event"] - if ":" not in curr_event.subject and curr_event.subject != role_name: - priority += [rel_ctx] - if priority: - return random.choice(priority) - - # Skip idle. - for event_desc, rel_ctx in retrieved.items(): - if "is idle" not in event_desc: - priority += [rel_ctx] - if priority: - return random.choice(priority) - return None - - -async def _should_react(role: "STRole", retrieved: dict, roles: dict): - """ - Determines what form of reaction the role should exihibit given the - retrieved values. - INPUT - role: Current <"STRole"> instance whose action we are determining. - retrieved: A dictionary of that were retrieved from the - the role's associative memory. This dictionary takes the - following form: - dictionary[event.description] = - {["curr_event"] = , - ["events"] = [, ...], - ["thoughts"] = [, ...] } - roles: A dictionary that contains all role names as keys, and the - <"STRole"> instance as values. - """ - - async def lets_talk(init_role: "STRole", target_role: "STRole", retrieved: dict): - if init_role.name == target_role.name: - logger.info(f"Role: {role.name} _should_react lets_talk meet same role, return False") - return False - - scratch = init_role.rc.scratch - target_scratch = target_role.rc.scratch - if ( - not target_scratch.act_address - or not target_scratch.act_description - or not scratch.act_address - or not scratch.act_description - ): - return False - - if "sleeping" in target_scratch.act_description or "sleeping" in scratch.act_description: - return False - - if scratch.curr_time.hour == 23: - return False - - if "" in target_scratch.act_address: - return False - - if target_scratch.chatting_with or scratch.chatting_with: - return False - - if target_role.name in scratch.chatting_with_buffer: - if scratch.chatting_with_buffer[target_role.name] > 0: - return False - - if await DecideToTalk().run(init_role, target_role, retrieved): - return True - - return False - - async def lets_react(init_role: "STRole", target_role: "STRole", retrieved: dict): - if init_role.name == target_role.name: - logger.info(f"Role: {role.name} _should_react lets_react meet same role, return False") - return False - - scratch = init_role.rc.scratch - target_scratch = target_role.rc.scratch - if ( - not target_scratch.act_address - or not target_scratch.act_description - or not scratch.act_address - or not scratch.act_description - ): - return False - - if "sleeping" in target_scratch.act_description or "sleeping" in scratch.act_description: - return False - - # return False - if scratch.curr_time.hour == 23: - return False - - if "waiting" in target_scratch.act_description: - return False - if scratch.planned_path == []: - return False - - if scratch.act_address != target_scratch.act_address: - return False - - react_mode = await DecideToTalk().run(init_role, target_role, retrieved) - - if react_mode == "1": - wait_until = ( - target_scratch.act_start_time + datetime.timedelta(minutes=target_scratch.act_duration - 1) - ).strftime("%B %d, %Y, %H:%M:%S") - return f"wait: {wait_until}" - elif react_mode == "2": - return False - return "do other things" - else: - return False # "keep" - - # If the role is chatting right now, default to no reaction - scratch = role.rc.scratch - if scratch.chatting_with: - return False - if "" in scratch.act_address: - return False - - # Recall that retrieved takes the following form: - # dictionary {["curr_event"] = } - curr_event = retrieved["curr_event"] - logger.info(f"Role: {role.name} _should_react curr_event.subject: {curr_event.subject}") - - if ":" not in curr_event.subject: - # this is a role event. - if await lets_talk(role, roles[curr_event.subject], retrieved): - return f"chat with {curr_event.subject}" - react_mode = await lets_react(role, roles[curr_event.subject], retrieved) - return react_mode - return False - - -async def _chat_react(role: "STRole", reaction_mode: str, roles: dict["STRole"]): - # There are two roles -- the role who is initiating the conversation - # and the role who is the target. We get the role instances here. - init_role = role - target_role = roles[reaction_mode[9:].strip()] - - # Actually creating the conversation here. - convo, duration_min = await generate_convo(init_role, target_role) # 2222 - convo_summary = await generate_convo_summary(convo) - inserted_act = convo_summary - inserted_act_dur = duration_min - - act_start_time = target_role.rc.scratch.act_start_time - - curr_time = target_role.rc.scratch.curr_time - if curr_time.second != 0: - temp_curr_time = curr_time + datetime.timedelta(seconds=60 - curr_time.second) - chatting_end_time = temp_curr_time + datetime.timedelta(minutes=inserted_act_dur) - else: - chatting_end_time = curr_time + datetime.timedelta(minutes=inserted_act_dur) - - for role, p in [("init", init_role), ("target", target_role)]: - if role == "init": - act_address = f" {target_role.name}" - act_event = (p.name, "chat with", target_role.name) - chatting_with = target_role.name - chatting_with_buffer = {} - chatting_with_buffer[target_role.name] = 800 - elif role == "target": - act_address = f" {init_role.name}" - act_event = (p.name, "chat with", init_role.name) - chatting_with = init_role.name - chatting_with_buffer = {} - chatting_with_buffer[init_role.name] = 800 - - act_pronunciatio = "💬" - act_obj_description = None - act_obj_pronunciatio = None - act_obj_event = (None, None, None) - - await _create_react( - p, - inserted_act, - inserted_act_dur, - act_address, - act_event, - chatting_with, - convo, - chatting_with_buffer, - chatting_end_time, - act_pronunciatio, - act_obj_description, - act_obj_pronunciatio, - act_obj_event, - act_start_time, - ) - - -async def _create_react( - role: "STRole", - inserted_act: str, - inserted_act_dur: int, - act_address: str, - act_event: Tuple, - chatting_with: str, - chat: list, - chatting_with_buffer: dict, - chatting_end_time: datetime, - act_pronunciatio: str, - act_obj_description: str, - act_obj_pronunciatio: str, - act_obj_event: Tuple, - act_start_time=None, -): - p = role - scratch = role.rc.scratch - - min_sum = 0 - for i in range(scratch.get_f_daily_schedule_hourly_org_index()): - min_sum += scratch.f_daily_schedule_hourly_org[i][1] - start_hour = int(min_sum / 60) - - if scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] >= 120: - end_hour = ( - start_hour + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] / 60 - ) - - elif ( - scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] - + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][1] - ): - end_hour = start_hour + ( - ( - scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] - + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][1] - ) - / 60 - ) - - else: - end_hour = start_hour + 2 - end_hour = int(end_hour) - - dur_sum = 0 - count = 0 - start_index = None - end_index = None - for act, dur in scratch.f_daily_schedule: - if dur_sum >= start_hour * 60 and start_index is None: - start_index = count - if dur_sum >= end_hour * 60 and end_index is None: - end_index = count - dur_sum += dur - count += 1 - - ret = await generate_new_decomp_schedule(p, inserted_act, inserted_act_dur, start_hour, end_hour) - scratch.f_daily_schedule[start_index:end_index] = ret - scratch.add_new_action( - act_address, - inserted_act_dur, - inserted_act, - act_pronunciatio, - act_event, - chatting_with, - chat, - chatting_with_buffer, - chatting_end_time, - act_obj_description, - act_obj_pronunciatio, - act_obj_event, - act_start_time, - ) - - -async def _wait_react(role: "STRole", reaction_mode: str): - scratch = role.rc.scratch - - inserted_act = f'waiting to start {scratch.act_description.split("(")[-1][:-1]}' - end_time = datetime.datetime.strptime(reaction_mode[6:].strip(), "%B %d, %Y, %H:%M:%S") - inserted_act_dur = ( - (end_time.minute + end_time.hour * 60) - (scratch.curr_time.minute + scratch.curr_time.hour * 60) + 1 - ) - - act_address = f" {scratch.curr_tile[0]} {scratch.curr_tile[1]}" - act_event = (role.name, "waiting to start", scratch.act_description.split("(")[-1][:-1]) - chatting_with = None - chat = None - chatting_with_buffer = None - chatting_end_time = None - - act_pronunciatio = "⌛" - act_obj_description = None - act_obj_pronunciatio = None - act_obj_event = (None, None, None) - - await _create_react( - role, - inserted_act, - inserted_act_dur, - act_address, - act_event, - chatting_with, - chat, - chatting_with_buffer, - chatting_end_time, - act_pronunciatio, - act_obj_description, - act_obj_pronunciatio, - act_obj_event, - ) - - -async def generate_convo(init_role: "STRole", target_role: "STRole") -> Union[list, int]: - convo = await agent_conversation(init_role, target_role) - all_utt = "" - - for row in convo: - speaker = row[0] - utt = row[1] - all_utt += f"{speaker}: {utt}\n" - - convo_length = math.ceil(int(len(all_utt) / 8) / 30) - - return convo, convo_length - - -async def generate_convo_summary(conv: list[list[str]]) -> str: - conv_summary = await SummarizeConv().run(conv) - return conv_summary - - -async def generate_new_decomp_schedule( - role: "STRole", inserted_act: str, inserted_act_dur: int, start_hour: int, end_hour: int -): - # Step 1: Setting up the core variables for the function. - #

is the role whose schedule we are editing right now. - scratch = role.rc.scratch - # indicates the number of minutes that have passed today. - today_min_pass = int(scratch.curr_time.hour) * 60 + int(scratch.curr_time.minute) + 1 - - # Step 2: We need to create and . - main_act_dur = [] - truncated_act_dur = [] - dur_sum = 0 # duration sum - count = 0 # enumerate count - truncated_fin = False - - print("DEBUG::: ", scratch.name) - for act, dur in scratch.f_daily_schedule: - if (dur_sum >= start_hour * 60) and (dur_sum < end_hour * 60): - main_act_dur += [[act, dur]] - if dur_sum <= today_min_pass: - truncated_act_dur += [[act, dur]] - elif dur_sum > today_min_pass and not truncated_fin: - # We need to insert that last act, duration list like this one: - # e.g., ['wakes up and completes her morning routine (wakes up...)', 2] - truncated_act_dur += [[scratch.f_daily_schedule[count][0], dur_sum - today_min_pass]] - truncated_act_dur[-1][-1] -= ( - dur_sum - today_min_pass - ) # DEC 7 DEBUG;.. is the +1 the right thing to do??? - # DEC 7 DEBUG;.. is the +1 the right thing to do??? - # truncated_act_dur[-1][-1] -= (dur_sum - today_min_pass + 1) - print("DEBUG::: ", truncated_act_dur) - - # DEC 7 DEBUG;.. is the +1 the right thing to do??? - # truncated_act_dur[-1][-1] -= (dur_sum - today_min_pass) - truncated_fin = True - dur_sum += dur - count += 1 - - main_act_dur = main_act_dur - - x = ( - truncated_act_dur[-1][0].split("(")[0].strip() - + " (on the way to " - + truncated_act_dur[-1][0].split("(")[-1][:-1] - + ")" - ) - truncated_act_dur[-1][0] = x - - if "(" in truncated_act_dur[-1][0]: - inserted_act = truncated_act_dur[-1][0].split("(")[0].strip() + " (" + inserted_act + ")" - - # To do inserted_act_dur+1 below is an important decision but I'm not sure - # if I understand the full extent of its implications. Might want to - # revisit. - truncated_act_dur += [[inserted_act, inserted_act_dur]] - start_time_hour = datetime.datetime(2022, 10, 31, 0, 0) + datetime.timedelta(hours=start_hour) - end_time_hour = datetime.datetime(2022, 10, 31, 0, 0) + datetime.timedelta(hours=end_hour) - - return await NewDecompSchedule().run( - role, main_act_dur, truncated_act_dur, start_time_hour, end_time_hour, inserted_act, inserted_act_dur - ) - - -async def _long_term_planning(role: "STRole", new_day: bool): - """ - Formulates the role's daily long-term plan if it is the start of a new - day. This basically has two components: first, we create the wake-up hour, - and second, we create the hourly schedule based on it. - INPUT - new_day: Indicates whether the current time signals a "First day", - "New day", or False (for neither). This is important because we - create the roles' long term planning on the new day. - """ - # We start by creating the wake up hour for the role. - wake_up_hour = await WakeUp().run(role) - wake_up_hour = int(wake_up_hour) - logger.info(f"Role: {role.name} long_term_planning, wake_up_hour: {wake_up_hour}") - - # When it is a new day, we start by creating the daily_req of the role. - # Note that the daily_req is a list of strings that describe the role's - # day in broad strokes. - if new_day == "First day": - # Bootstrapping the daily plan for the start of then generation: - # if this is the start of generation (so there is no previous day's - # daily requirement, or if we are on a new day, we want to create a new - # set of daily requirements. - role.scratch.daily_req = await GenDailySchedule().run(role, wake_up_hour) - logger.info(f"Role: {role.name} daily requirements: {role.scratch.daily_req}") - elif new_day == "New day": - revise_identity(role) - - # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - TODO - # We need to create a new daily_req here... - role.scratch.daily_req = role.scratch.daily_req - - # Based on the daily_req, we create an hourly schedule for the role, - # which is a list of todo items with a time duration (in minutes) that - # add up to 24 hours. - role.scratch.f_daily_schedule = await GenHourlySchedule().run(role, wake_up_hour) - logger.info(f"Role: {role.name} f_daily_schedule: {role.scratch.f_daily_schedule}") - role.scratch.f_daily_schedule_hourly_org = role.scratch.f_daily_schedule[:] - - # Added March 4 -- adding plan to the memory. - thought = f"This is {role.scratch.name}'s plan for {role.scratch.curr_time.strftime('%A %B %d')}:" - for i in role.scratch.daily_req: - thought += f" {i}," - thought = thought[:-1] + "." - created = role.scratch.curr_time - expiration = role.scratch.curr_time + datetime.timedelta(days=30) - s, p, o = (role.scratch.name, "plan", role.scratch.curr_time.strftime("%A %B %d")) - keywords = set(["plan"]) - thought_poignancy = 5 - thought_embedding_pair = (thought, get_embedding(thought)) - role.a_mem.add_thought( - created, expiration, s, p, o, thought, keywords, thought_poignancy, thought_embedding_pair, None - ) - - # print("Sleeping for 20 seconds...") - # time.sleep(10) - # print("Done sleeping!") - - -async def _determine_action(role: "STRole"): - """ - Creates the next action sequence for the role. - The main goal of this function is to run "add_new_action" on the role's - scratch space, which sets up all the action related variables for the next - action. - As a part of this, the role may need to decompose its hourly schedule as - needed. - INPUT - role: Current instance whose action we are determining. - """ - - def determine_decomp(act_desp, act_dura): - """ - Given an action description and its duration, we determine whether we need - to decompose it. If the action is about the agent sleeping, we generally - do not want to decompose it, so that's what we catch here. - - INPUT: - act_desp: the description of the action (e.g., "sleeping") - act_dura: the duration of the action in minutes. - OUTPUT: - a boolean. True if we need to decompose, False otherwise. - """ - if "sleep" not in act_desp and "bed" not in act_desp: - return True - elif "sleeping" in act_desp or "asleep" in act_desp or "in bed" in act_desp: - return False - elif "sleep" in act_desp or "bed" in act_desp: - if act_dura > 60: - return False - return True - - # The goal of this function is to get us the action associated with - # . As a part of this, we may need to decompose some large - # chunk actions. - # Importantly, we try to decompose at least two hours worth of schedule at - # any given point. - curr_index = role.scratch.get_f_daily_schedule_index() - curr_index_60 = role.scratch.get_f_daily_schedule_index(advance=60) - - logger.info(f"f_daily_schedule: {role.scratch.f_daily_schedule}") - # * Decompose * - # During the first hour of the day, we need to decompose two hours - # sequence. We do that here. - if curr_index == 0: - # This portion is invoked if it is the first hour of the day. - act_desp, act_dura = role.scratch.f_daily_schedule[curr_index] - if act_dura >= 60: - # We decompose if the next action is longer than an hour, and fits the - # criteria described in determine_decomp. - if determine_decomp(act_desp, act_dura): - role.scratch.f_daily_schedule[curr_index : curr_index + 1] = await TaskDecomp().run( - role, act_desp, act_dura - ) - if curr_index_60 + 1 < len(role.scratch.f_daily_schedule): - act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60 + 1] - if act_dura >= 60: - if determine_decomp(act_desp, act_dura): - role.scratch.f_daily_schedule[curr_index_60 + 1 : curr_index_60 + 2] = await TaskDecomp().run( - role, act_desp, act_dura - ) - - if curr_index_60 < len(role.scratch.f_daily_schedule): - # If it is not the first hour of the day, this is always invoked (it is - # also invoked during the first hour of the day -- to double up so we can - # decompose two hours in one go). Of course, we need to have something to - # decompose as well, so we check for that too. - if role.scratch.curr_time.hour < 23: - # And we don't want to decompose after 11 pm. - act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60] - if act_dura >= 60: - if determine_decomp(act_desp, act_dura): - role.scratch.f_daily_schedule[curr_index_60 : curr_index_60 + 1] = await TaskDecomp().run( - role, act_desp, act_dura - ) - # * End of Decompose * - - # Generate an instance from the action description and duration. By - # this point, we assume that all the relevant actions are decomposed and - # ready in f_daily_schedule. - print("DEBUG LJSDLFSKJF") - for i in role.scratch.f_daily_schedule: - print(i) - print(curr_index) - print(len(role.scratch.f_daily_schedule)) - print(role.scratch.name) - print("------") - - # 1440 - x_emergency = 0 - for i in role.scratch.f_daily_schedule: - x_emergency += i[1] - # print ("x_emergency", x_emergency) - - if 1440 - x_emergency > 0: - print("x_emergency__AAA", x_emergency) - role.scratch.f_daily_schedule += [["sleeping", 1440 - x_emergency]] - - act_desp, act_dura = role.scratch.f_daily_schedule[curr_index] - - new_action_details = await GenActionDetails().run(role, act_desp, act_dura) - # Adding the action to role's queue. - role.scratch.add_new_action(**new_action_details) - - -def revise_identity(role: "STRole"): - p_name = role.scratch.name - - focal_points = [ - f"{p_name}'s plan for {role.scratch.get_str_curr_date_str()}.", - f"Important recent events for {p_name}'s life.", - ] - retrieved = new_agent_retrieve(role, focal_points) - - statements = "[Statements]\n" - for key, val in retrieved.items(): - for i in val: - statements += f"{i.created.strftime('%A %B %d -- %H:%M %p')}: {i.embedding_key}\n" - - # print (";adjhfno;asdjao;idfjo;af", p_name) - plan_prompt = statements + "\n" - plan_prompt += f"Given the statements above, is there anything that {p_name} should remember as they plan for" - plan_prompt += f" *{role.scratch.curr_time.strftime('%A %B %d')}*? " - plan_prompt += "If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement)\n\n" - plan_prompt += f"Write the response from {p_name}'s perspective." - plan_note = LLM().ask(plan_prompt) - # print (plan_note) - - thought_prompt = statements + "\n" - thought_prompt += ( - f"Given the statements above, how might we summarize {p_name}'s feelings about their days up to now?\n\n" - ) - thought_prompt += f"Write the response from {p_name}'s perspective." - thought_note = LLM().ask(thought_prompt) - # print (thought_note) - - currently_prompt = ( - f"{p_name}'s status from {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n" - ) - currently_prompt += f"{role.scratch.currently}\n\n" - currently_prompt += f"{p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n" - currently_prompt += (plan_note + thought_note).replace("\n", "") + "\n\n" - currently_prompt += f"It is now {role.scratch.curr_time.strftime('%A %B %d')}. Given the above, write {p_name}'s status for {role.scratch.curr_time.strftime('%A %B %d')} that reflects {p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}. Write this in third-person talking about {p_name}." - currently_prompt += "If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement).\n\n" - currently_prompt += "Follow this format below:\nStatus: " - # print ("DEBUG ;adjhfno;asdjao;asdfsidfjo;af", p_name) - # print (currently_prompt) - new_currently = LLM().ask(currently_prompt) - # print (new_currently) - # print (new_currently[10:]) - - role.scratch.currently = new_currently - - daily_req_prompt = role.scratch.get_str_iss() + "\n" - daily_req_prompt += f"Today is {role.scratch.curr_time.strftime('%A %B %d')}. Here is {role.scratch.name}'s plan today in broad-strokes (with the time of the day. e.g., have a lunch at 12:00 pm, watch TV from 7 to 8 pm).\n\n" - daily_req_prompt += "Follow this format (the list should have 4~6 items but no more):\n" - daily_req_prompt += "1. wake up and complete the morning routine at