diff --git a/examples/st_game/actions/gen_action_details.py b/examples/st_game/actions/gen_action_details.py new file mode 100644 index 000000000..a734315db --- /dev/null +++ b/examples/st_game/actions/gen_action_details.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : gen_action_details + +import datetime +import random + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from ..maze import Maze +from .st_action import STAction + + +class GenActionSector(STAction): + + def __init__(self, name="GenActionSector", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cleaned_response = llm_resp.split("}")[0] + return cleaned_response + + def _func_validate(self, llm_resp: str, prompt: str): + if len(llm_resp.strip()) < 1: + return False + if "}" not in llm_resp: + return False + if "," in llm_resp: + return False + return True + + def _func_fail_default_resp(self): + fs = ("kitchen") + return fs + + def run(self, role: STRole, maze: Maze, act_desp: str): + def create_prompt_input(role, maze, act_desp): + act_world = f"{maze.access_tile(role.scratch.curr_tile)['world']}" + + prompt_input = [] + + prompt_input += [role.scratch.get_str_name()] + prompt_input += [role.scratch.living_area.split(":")[1]] + x = f"{act_world}:{role.scratch.living_area.split(':')[1]}" + prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)] + + + prompt_input += [role.scratch.get_str_name()] + prompt_input += [f"{maze.access_tile(role.scratch.curr_tile)['sector']}"] + x = f"{act_world}:{maze.access_tile(role.scratch.curr_tile)['sector']}" + prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)] + + if role.scratch.get_str_daily_plan_req() != "": + prompt_input += [f"\n{role.scratch.get_str_daily_plan_req()}"] + else: + prompt_input += [""] + + + # MAR 11 TEMP + prompt_input = [] + act_world = maze.access_tile(role.scratch.curr_tile)["world"] + accessible_sector_str = role.s_mem.get_str_accessible_sectors(act_world) + curr = accessible_sector_str.split(", ") + fin_accessible_sectors = [] + for i in curr: + if "'s house" in i: + if role.scratch.last_name in i: + fin_accessible_sectors += [i] + else: + fin_accessible_sectors += [i] + accessible_sector_str = ", ".join(fin_accessible_sectors) + # END MAR 11 TEMP + + prompt_input += [accessible_sector_str] + + act_desp_1 = act_desp + act_desp_2 = act_desp + if "(" in act_desp: + act_desp_1 = act_desp.split("(")[0].strip() + act_desp_2 = act_desp.split("(")[-1][:-1] + prompt_input += [role.scratch.get_str_name()] + prompt_input += [act_desp_1] + + prompt_input += [act_desp_2] + prompt_input += [role.scratch.get_str_name()] + return prompt_input + + prompt_template = "action_location_sector_v1.txt" + prompt_input = create_prompt_input(role, maze, act_desp) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + y = f"{maze.access_tile(role.scratch.curr_tile)['world']}" + x = [i.strip() for i in role.s_mem.get_str_accessible_sectors(y).split(",")] + if output not in x: + # output = random.choice(x) + output = role.scratch.living_area.split(":")[1] + return output + + +class GenActionArena(STAction): + def __init__(self, name="GenActionArena", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cleaned_response = llm_resp.split("}")[0] + return cleaned_response + + def _func_validate(self, llm_resp: str, prompt: str): + if len(llm_resp.strip()) < 1: + return False + if "}" not in llm_resp: + return False + if "," in llm_resp: + return False + return True + + def _func_fail_default_resp(self): + fs = ("kitchen") + return fs + + def run(self, role: STRole, maze: Maze, act_desp: str, act_world: str, act_sector: str): + def create_prompt_input(role, maze, act_desp, act_world, act_sector): + prompt_input = [] + # prompt_input += [role.scratch.get_str_name()] + # prompt_input += [maze.access_tile(role.scratch.curr_tile)["arena"]] + # prompt_input += [maze.access_tile(role.scratch.curr_tile)["sector"]] + prompt_input += [role.scratch.get_str_name()] + x = f"{act_world}:{act_sector}" + prompt_input += [act_sector] + + # MAR 11 TEMP + accessible_arena_str = role.s_mem.get_str_accessible_sector_arenas(x) + curr = accessible_arena_str.split(", ") + fin_accessible_arenas = [] + for i in curr: + if "'s room" in i: + if role.scratch.last_name in i: + fin_accessible_arenas += [i] + else: + fin_accessible_arenas += [i] + accessible_arena_str = ", ".join(fin_accessible_arenas) + # END MAR 11 TEMP + prompt_input += [accessible_arena_str] + act_desp_1 = act_desp + act_desp_2 = act_desp + if "(" in act_desp: + act_desp_1 = act_desp.split("(")[0].strip() + act_desp_2 = act_desp.split("(")[-1][:-1] + prompt_input += [role.scratch.get_str_name()] + prompt_input += [act_desp_1] + + prompt_input += [act_desp_2] + prompt_input += [role.scratch.get_str_name()] + + prompt_input += [act_sector] + prompt_input += [accessible_arena_str] + return prompt_input + + prompt_template = "action_location_object_vMar11.txt" + prompt_input = create_prompt_input(role, maze, act_desp, act_world, act_sector) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + return output + + +class GenActionObject(STAction): + def __init__(self, name="GenActionObject", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str): + if len(llm_resp.strip()) < 1: + return False + return True + + def _func_cleanup(self, llm_resp: str, prompt: str): + cleaned_response = llm_resp.strip() + return cleaned_response + + def _func_fail_default_resp(self): + fs = ("bed") + return fs + + def run(self, role: STRole, act_desp: str, temp_address: str): + + def create_prompt_input(role, act_desp, temp_address): + prompt_input = [] + if "(" in act_desp: + act_desp = act_desp.split("(")[-1][:-1] + + prompt_input += [act_desp] + prompt_input += [role + .s_mem.get_str_accessible_arena_game_objects(temp_address)] + return prompt_input + + prompt_template = "action_object_v2.txt" + prompt_input = create_prompt_input(role, act_desp, temp_address) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + x = [i.strip() for i in role.s_mem.get_str_accessible_arena_game_objects(temp_address).split(",")] + if output not in x: + output = random.choice(x) + return output + + +class GenPronunciatio(STAction): + def __init__(self, name="GenPronunciatio", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cr = llm_resp.strip() + if len(cr) > 3: + cr = cr[:3] + return cr + + def _func_validate(self, llm_resp: str, prompt: str): + try: + self._func_cleanup(llm_resp, prompt="") + if len(llm_resp) == 0: + return False + except: + return False + return True + + def _func_fail_default_resp(self): + fs = "😋" + return fs + + def run(self, role: STRole, act_desp: str): + def create_prompt_input(act_desp): + if "(" in act_desp: + act_desp = act_desp.split("(")[-1].split(")")[0] + prompt_input = [act_desp] + return prompt_input + + + prompt_template = "generate_pronunciatio_v1.txt" + prompt_input = create_prompt_input(act_desp) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + example_output = "🛁🧖‍♀️" + special_instruction = "The value for the output must ONLY contain the emojis." + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v2(prompt, example_output, special_instruction) + return output + + +class GenEventTriple(STAction): + def __init__(self, name="GenEventTriple", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cr = llm_resp.strip() + cr = [i.strip() for i in cr.split(")")[0].split(",")] + return cr + + def _func_validate(self, llm_resp: str, prompt: str): + try: + llm_resp = self._func_cleanup(llm_resp, prompt="") + if len(llm_resp) != 2: + return False + except: + return False + return True + + def _func_fail_default_resp(self, role): + fs = (role.name, "is", "idle") + return fs + + def run(self, role: STRole, act_desp: str): + def create_prompt_input(role, act_desp): + if "(" in act_desp: + act_desp = act_desp.split("(")[-1].split(")")[0] + prompt_input = [role.name, + act_desp, + role.name] + return prompt_input + + prompt_template = "generate_event_triple_v1.txt" + prompt_input = create_prompt_input(role, act_desp) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp(role) + output = self._run_v1(prompt) + output = (role.name, output[0], output[1]) + return output + + +class GenActObjDescription(STAction): + def __init__(self, name="GenActObjDescription", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cr = llm_resp.strip() + if cr[-1] == ".": cr = cr[:-1] + return cr + + def _func_validate(self, llm_resp: str, prompt: str): + try: + llm_resp = self._func_cleanup(llm_resp, prompt="") + except: + return False + return True + + def _func_fail_default_resp(self,act_game_object): + fs = f"{act_game_object} is idle" + return fs + + def run(self, role: STRole, act_game_object: str, act_desp: str): + def create_prompt_input(act_game_object, act_desp, role): + prompt_input = [act_game_object, + role.name, + act_desp, + act_game_object, + act_game_object] + return prompt_input + + prompt_template = "generate_obj_event_v1.txt" ######## + prompt_input = create_prompt_input(act_game_object, act_desp, role) ######## + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + example_output = "being fixed" ######## + special_instruction = "The output should ONLY contain the phrase that should go in ." ######## + self.fail_default_resp = self._func_fail_default_resp(act_game_object) ######## + output = self._run_v2(prompt, example_output, special_instruction) + return output + + +class GenObjEventTriple(STAction): + def __init__(self, name="GenObjEventTriple", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str): + cr = llm_resp.strip() + cr = [i.strip() for i in cr.split(")")[0].split(",")] + return cr + + def _func_validate(self, llm_resp: str, prompt: str): + try: + llm_resp = self._func_cleanup(llm_resp, prompt="") + if len(llm_resp) != 2: + return False + except: return False + return True + + def _func_fail_default_resp(self,act_game_object): + fs = (act_game_object, "is", "idle") + return fs + + def run(self, role: STRole, act_game_object, act_obj_desp): + def create_prompt_input(act_game_object, act_obj_desp): + prompt_input = [act_game_object, + act_obj_desp, + act_game_object] + return prompt_input + + prompt_template = "generate_event_triple_v1.txt" + prompt_input = create_prompt_input(act_game_object, act_obj_desp) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp(role) + output = self._run_v1(prompt) + output = (role.name, output[0], output[1]) + return output + + +class GenActionDetails(STAction): + def __init__(self, name="GenActionDetails", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + pass + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + # TODO -- this sometimes generates error + try: + self._func_cleanup(llm_resp) + except: + return False + return True + + def _func_fail_default_resp(self): + fs = {} + return fs + + def run(self, + role: STRole, + act_desp: str, + act_dura): + maze = role._rc.env.maze + act_world = maze.access_tile(role.scratch.curr_tile)["world"] + act_sector = GenActionSector().run(role, maze, act_desp) + act_arena = GenActionArena().run(role, maze, act_desp, act_world, act_sector) + act_address = f"{act_world}:{act_sector}:{act_arena}" + act_game_object = GenActionObject().run(role, act_desp, act_address) + new_address = f"{act_world}:{act_sector}:{act_arena}:{act_game_object}" + act_pron = GenPronunciatio().run(role, act_desp) + act_event = GenEventTriple().run(role, act_desp) + # Persona's actions also influence the object states. We set those up here. + act_obj_desp = GenActObjDescription().run(role, act_game_object, act_desp) + act_obj_pron = GenPronunciatio().run(role, act_obj_desp) + act_obj_event = GenObjEventTriple().run(role, act_game_object, + act_obj_desp) + result_dict = { + "action_address": new_address, + "action_duration": int(act_dura), + "act_desp": act_desp, + "action_pronunciatio": act_pron, + "action_event": act_event, + "chatting_with": None, + "chat": None, + "chatting_with_buffer": None, + "chatting_end_time": None, + "act_obj_description": act_obj_desp, + "act_obj_pronunciatio": act_obj_pron, + "act_obj_event": act_obj_event} + return result_dict diff --git a/examples/st_game/actions/gen_daily_schedule.py b/examples/st_game/actions/gen_daily_schedule.py new file mode 100644 index 000000000..47dc54482 --- /dev/null +++ b/examples/st_game/actions/gen_daily_schedule.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : gen_daily_schedule + +import datetime + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from .st_action import STAction + +class GenDailySchedule(STAction): + def __init__(self, name="GenDailySchedule", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + try: + self._func_cleanup(llm_resp, prompt="") + except: + return False + return True + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + cr = [] + _cr = llm_resp.split(")") + for i in _cr: + if i[-1].isdigit(): + i = i[:-1].strip() + if i[-1] == "." or i[-1] == ",": + cr += [i[:-1].strip()] + return cr + + def _func_fail_default_resp(self) -> int: + fs = ['wake up and complete the morning routine at 6:00 am', + 'eat breakfast at 7:00 am', + 'read a book from 8:00 am to 12:00 pm', + 'have lunch at 12:00 pm', + 'take a nap from 1:00 pm to 4:00 pm', + 'relax and watch TV from 7:00 pm to 8:00 pm', + 'go to bed at 11:00 pm'] + return fs + + def run(self, role: STRole, wake_up_hour: str): + def create_prompt_input(role, wake_up_hour): + prompt_input = [] + prompt_input += [role.scratch.get_str_iss()] + prompt_input += [role.scratch.get_str_lifestyle()] + prompt_input += [role.scratch.get_str_curr_date_str()] + prompt_input += [role.scratch.get_str_firstname()] + prompt_input += [f"{str(wake_up_hour)}:00 am"] + return prompt_input + wake_up_hour = int(wake_up_hour) + prompt_template = "daily_planning_v6.txt" + prompt_input = create_prompt_input(role, wake_up_hour) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + output = ([f"wake up and complete the morning routine at {wake_up_hour}:00 am"] + + output) + return output diff --git a/examples/st_game/actions/gen_hourly_schedule.py b/examples/st_game/actions/gen_hourly_schedule.py new file mode 100644 index 000000000..b823dae01 --- /dev/null +++ b/examples/st_game/actions/gen_hourly_schedule.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : gen_hourly_schedule + +import datetime +import random +import string + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from .st_action import STAction + +def get_random_alphanumeric(i=6, j=6): + """ + Returns a random alpha numeric strength that has the length of somewhere + between i and j. + + INPUT: + i: min_range for the length + j: max_range for the length + OUTPUT: + an alpha numeric str with the length of somewhere between i and j. + """ + k = random.randint(i, j) + x = ''.join(random.choices(string.ascii_letters + string.digits, k=k)) + return x + +class GenHourlySchedule(STAction): + def __init__(self, name="GenHourlySchedule", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + try: + self._func_cleanup(llm_resp, prompt="") + except: + return False + return True + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + cr = llm_resp.strip() + if cr[-1] == ".": + cr = cr[:-1] + return cr + + def _func_fail_default_resp(self) -> int: + fs = "asleep" + return fs + + def _generate_schedule_for_given_hour(self, role: STRole, + curr_hour_str, + p_f_ds_hourly_org, + hour_str, + intermission2): + def create_prompt_input(persona, + curr_hour_str, + p_f_ds_hourly_org, + hour_str, + intermission2=None): + schedule_format = "" + for i in hour_str: + schedule_format += f"[{persona.scratch.get_str_curr_date_str()} -- {i}]" + schedule_format += f" Activity: [Fill in]\n" + schedule_format = schedule_format[:-1] + + intermission_str = f"Here the originally intended hourly breakdown of" + intermission_str += f" {persona.scratch.get_str_firstname()}'s schedule today: " + for count, i in enumerate(persona.scratch.daily_req): + intermission_str += f"{str(count+1)}) {i}, " + intermission_str = intermission_str[:-2] + + prior_schedule = "" + if p_f_ds_hourly_org: + prior_schedule = "\n" + for count, i in enumerate(p_f_ds_hourly_org): + prior_schedule += f"[(ID:{get_random_alphanumeric()})" + prior_schedule += f" {persona.scratch.get_str_curr_date_str()} --" + prior_schedule += f" {hour_str[count]}] Activity:" + prior_schedule += f" {persona.scratch.get_str_firstname()}" + prior_schedule += f" is {i}\n" + + prompt_ending = f"[(ID:{get_random_alphanumeric()})" + prompt_ending += f" {persona.scratch.get_str_curr_date_str()}" + prompt_ending += f" -- {curr_hour_str}] Activity:" + prompt_ending += f" {persona.scratch.get_str_firstname()} is" + + if intermission2: + intermission2 = f"\n{intermission2}" + + prompt_input = [] + prompt_input += [schedule_format] + prompt_input += [persona.scratch.get_str_iss()] + + prompt_input += [prior_schedule + "\n"] + prompt_input += [intermission_str] + if intermission2: + prompt_input += [intermission2] + else: + prompt_input += [""] + prompt_input += [prompt_ending] + + return prompt_input + + wake_up_hour = int(wake_up_hour) + prompt_template = "generate_hourly_schedule_v2.txt" + prompt_input = create_prompt_input(role, + curr_hour_str, + p_f_ds_hourly_org, + hour_str, + intermission2) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template) + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + return output + + def run(self, role: STRole, wake_up_hour: str): + hour_str = ["00:00 AM", "01:00 AM", "02:00 AM", "03:00 AM", "04:00 AM", + "05:00 AM", "06:00 AM", "07:00 AM", "08:00 AM", "09:00 AM", + "10:00 AM", "11:00 AM", "12:00 PM", "01:00 PM", "02:00 PM", + "03:00 PM", "04:00 PM", "05:00 PM", "06:00 PM", "07:00 PM", + "08:00 PM", "09:00 PM", "10:00 PM", "11:00 PM"] + n_m1_activity = [] + diversity_repeat_count = 3 + for i in range(diversity_repeat_count): + n_m1_activity_set = set(n_m1_activity) + if len(n_m1_activity_set) < 5: + n_m1_activity = [] + for count, curr_hour_str in enumerate(hour_str): + if wake_up_hour > 0: + n_m1_activity += ["sleeping"] + wake_up_hour -= 1 + else: + n_m1_activity += [self._generate_schedule_for_given_hour( + role, curr_hour_str, n_m1_activity, hour_str)[0]] + + # Step 1. Compressing the hourly schedule to the following format: + # The integer indicates the number of hours. They should add up to 24. + # [['sleeping', 6], ['waking up and starting her morning routine', 1], + # ['eating breakfast', 1], ['getting ready for the day', 1], + # ['working on her painting', 2], ['taking a break', 1], + # ['having lunch', 1], ['working on her painting', 3], + # ['taking a break', 2], ['working on her painting', 2], + # ['relaxing and watching TV', 1], ['going to bed', 1], ['sleeping', 2]] + _n_m1_hourly_compressed = [] + prev = None + prev_count = 0 + for i in n_m1_activity: + if i != prev: + prev_count = 1 + _n_m1_hourly_compressed += [[i, prev_count]] + prev = i + elif _n_m1_hourly_compressed: + _n_m1_hourly_compressed[-1][1] += 1 + + # Step 2. Expand to min scale (from hour scale) + # [['sleeping', 360], ['waking up and starting her morning routine', 60], + # ['eating breakfast', 60],.. + n_m1_hourly_compressed = [] + for task, duration in _n_m1_hourly_compressed: + n_m1_hourly_compressed += [[task, duration*60]] + + return n_m1_hourly_compressed + diff --git a/examples/st_game/actions/task_decomp.py b/examples/st_game/actions/task_decomp.py new file mode 100644 index 000000000..13bd01727 --- /dev/null +++ b/examples/st_game/actions/task_decomp.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : task_decomp + +import datetime + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from ..actions.st_action import STAction + + +class TaskDecomp(STAction): + + def __init__(self, name="TaskDecomp", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + # TODO SOMETHING HERE sometimes fails... See screenshot + temp = [i.strip() for i in llm_resp.split("\n")] + _cr = [] + cr = [] + for count, i in enumerate(temp): + if count != 0: + _cr += [" ".join([j.strip () for j in i.split(" ")][3:])] + else: + _cr += [i] + for count, i in enumerate(_cr): + k = [j.strip() for j in i.split("(duration in minutes:")] + task = k[0] + if task[-1] == ".": + task = task[:-1] + duration = int(k[1].split(",")[0].strip()) + cr += [[task, duration]] + + total_expected_min = int(prompt.split("(total duration in minutes")[-1] + .split("):")[0].strip()) + + # TODO -- now, you need to make sure that this is the same as the sum of + # the current action sequence. + curr_min_slot = [["dummy", -1],] # (task_name, task_index) + for count, i in enumerate(cr): + i_task = i[0] + i_duration = i[1] + + i_duration -= (i_duration % 5) + if i_duration > 0: + for j in range(i_duration): + curr_min_slot += [(i_task, count)] + curr_min_slot = curr_min_slot[1:] + + if len(curr_min_slot) > total_expected_min: + last_task = curr_min_slot[60] + for i in range(1, 6): + curr_min_slot[-1 * i] = last_task + elif len(curr_min_slot) < total_expected_min: + last_task = curr_min_slot[-1] + for i in range(total_expected_min - len(curr_min_slot)): + curr_min_slot += [last_task] + + cr_ret = [["dummy", -1],] + for task, task_index in curr_min_slot: + if task != cr_ret[-1][0]: + cr_ret += [[task, 1]] + else: + cr_ret[-1][1] += 1 + cr = cr_ret[1:] + + return cr + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + # TODO -- this sometimes generates error + try: + self._func_cleanup(llm_resp) + except: + return False + return True + + def _func_fail_default_resp(self) -> int: + fs = ["asleep"] + return fs + + def run(self, + role: STRole, + main_act_dur: int, + truncated_act_dur: int, + start_time_hour: datetime, + end_time_hour: datetime, + inserted_act: str, + inserted_act_dur: int, + *args, **kwargs): + + def create_prompt_input(role, task, duration): + + """ + Today is Saturday June 25. From 00:00 ~ 06:00am, Maeve is + planning on sleeping, 06:00 ~ 07:00am, Maeve is + planning on waking up and doing her morning routine, + and from 07:00am ~08:00am, Maeve is planning on having breakfast. + """ + + curr_f_org_index = role.scratch.get_f_daily_schedule_hourly_org_index() + all_indices = [] + # if curr_f_org_index > 0: + # all_indices += [curr_f_org_index-1] + all_indices += [curr_f_org_index] + if curr_f_org_index+1 <= len(role.scratch.f_daily_schedule_hourly_org): + all_indices += [curr_f_org_index+1] + if curr_f_org_index+2 <= len(role.scratch.f_daily_schedule_hourly_org): + all_indices += [curr_f_org_index+2] + + curr_time_range = "" + + print ("DEBUG") + print (role.scratch.f_daily_schedule_hourly_org) + print (all_indices) + + summ_str = f'Today is {role.scratch.curr_time.strftime("%B %d, %Y")}. ' + summ_str += f'From ' + for index in all_indices: + print ("index", index) + if index < len(role.scratch.f_daily_schedule_hourly_org): + start_min = 0 + for i in range(index): + start_min += role.scratch.f_daily_schedule_hourly_org[i][1] + end_min = start_min + role.scratch.f_daily_schedule_hourly_org[index][1] + start_time = (datetime.datetime.strptime("00:00:00", "%H:%M:%S") + + datetime.timedelta(minutes=start_min)) + end_time = (datetime.datetime.strptime("00:00:00", "%H:%M:%S") + + datetime.timedelta(minutes=end_min)) + start_time_str = start_time.strftime("%H:%M%p") + end_time_str = end_time.strftime("%H:%M%p") + summ_str += f"{start_time_str} ~ {end_time_str}, {role.name} is planning on {role.scratch.f_daily_schedule_hourly_org[index][0]}, " + if curr_f_org_index+1 == index: + curr_time_range = f'{start_time_str} ~ {end_time_str}' + summ_str = summ_str[:-2] + "." + + prompt_input = [] + prompt_input += [role.scratch.get_str_iss()] + prompt_input += [summ_str] + # prompt_input += [role.scratch.get_str_curr_date_str()] + prompt_input += [role.scratch.get_str_firstname()] + prompt_input += [role.scratch.get_str_firstname()] + prompt_input += [task] + prompt_input += [curr_time_range] + prompt_input += [duration] + prompt_input += [role.scratch.get_str_firstname()] + return prompt_input + + prompt_input = create_prompt_input(role, + main_act_dur, + truncated_act_dur, + start_time_hour, + end_time_hour, + inserted_act, + inserted_act_dur) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, + "task_decomp_v3.txt") + self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur) + output = self._run_v1(prompt) + return output diff --git a/examples/st_game/actions/wake_up.py b/examples/st_game/actions/wake_up.py new file mode 100644 index 000000000..911c8ebcf --- /dev/null +++ b/examples/st_game/actions/wake_up.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : wake_up + +import datetime + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from ..actions.st_action import STAction + + +class WakeUp(STAction): + + def __init__(self, name="WakeUp", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + try: + self._func_cleanup(llm_resp, prompt="") + except: + return False + return True + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + cr = int(llm_resp.strip().lower().split("am")[0]) + return cr + + def _func_fail_default_resp(self) -> int: + fs = 8 + return fs + + def run(self, role: STRole): + def create_prompt_input(role): + prompt_input = [role.scratch.get_str_iss(), + role.scratch.get_str_lifestyle(), + role.scratch.get_str_firstname()] + return prompt_input + + prompt_input = create_prompt_input(role) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, "wake_up_hour_v1.txt") + self.fail_default_resp = self._func_fail_default_resp() + output = self._run_v1(prompt) + return output \ No newline at end of file diff --git a/examples/st_game/memory/spatial_memory.py b/examples/st_game/memory/spatial_memory.py index f9a4847c8..87d16b635 100644 --- a/examples/st_game/memory/spatial_memory.py +++ b/examples/st_game/memory/spatial_memory.py @@ -8,10 +8,14 @@ memory that aids in grounding their behavior in the game world. import json import os +from ..utils.utils import check_if_file_exists class MemoryTree: - def __init__(self) -> None: + def __init__(self, f_saved: str): self.tree = {} + if check_if_file_exists(f_saved): + with open(f_saved) as f: + self.tree = json.load(f) def set_mem_path(self, f_saved: str): if os.path.isfile(f_saved) and os.path.exists(f_saved): diff --git a/examples/st_game/plan/st_plan.py b/examples/st_game/plan/st_plan.py index 9bdfa28a2..c9504b20b 100644 --- a/examples/st_game/plan/st_plan.py +++ b/examples/st_game/plan/st_plan.py @@ -7,24 +7,48 @@ from typing import Union, Tuple from datetime import datetime import math -from examples.st_game.maze import Maze -from examples.st_game.plan.converse import agent_conversation -from examples.st_game.roles.st_role import STRole -from examples.st_game.actions.decide_to_talk import DecideToTalk -from examples.st_game.actions.summarize_conv import SummarizeConv -from examples.st_game.actions.new_decomp_schedule import NewDecompSchedule +from metagpt.llm import LLM +from ..maze import Maze +from ..plan.converse import agent_conversation +from ..roles.st_role import STRole +from ..actions.decide_to_talk import DecideToTalk +from ..actions.summarize_conv import SummarizeConv +from ..actions.new_decomp_schedule import NewDecompSchedule +from ..actions.task_decomp import TaskDecomp +from ..actions.wake_up import WakeUp +from ..actions.gen_daily_schedule import GenDailySchedule +from ..actions.gen_hourly_schedule import GenHourlySchedule +from ..actions.gen_action_details import GenActionDetails +from ..utils.utils import get_embedding +from ..memory.retrieve import new_retrieve def plan(role: STRole, maze: Maze, roles: list[STRole], new_day: bool, retrieved: dict): + # PART 1: Generate the hourly schedule. + if new_day: + _long_term_planning(role, new_day) + # PART 2: If the current action has expired, we want to create a new plan. + if role.scratch.act_check_finished(): + _determine_action(role, maze) + + # PART 3: If you perceived an event that needs to be responded to (saw + # another role), and retrieved relevant information. + # Step 1: Retrieved may have multiple events represented in it. The first + # job here is to determine which of the events we want to focus + # on for the role. + # takes the form of a dictionary like this: + # dictionary {["curr_event"] = , + # ["events"] = [, ...], + # ["thoughts"] = [, ...]} focused_event = False if retrieved.keys(): focused_event = _choose_retrieved(role, retrieved) # Step 2: Once we choose an event, we need to determine whether the - # persona will take any actions for the perceived event. There are + # role will take any actions for the perceived event. There are # three possible modes of reaction returned by _should_react. - # a) "chat with {target_persona.name}" + # a) "chat with {target_role.name}" # b) "react" # c) False if focused_event: @@ -82,7 +106,7 @@ def _choose_retrieved(role_name: str, retrieved: dict) -> Union[None, dict]: def _should_react(role: "STRole", retrieved: dict, roles: dict): """ - Determines what form of reaction the persona should exihibit given the + Determines what form of reaction the role should exihibit given the retrieved values. INPUT role: Current instance whose action we are determining. @@ -170,7 +194,7 @@ def _should_react(role: "STRole", retrieved: dict, roles: dict): else: return False # "keep" - # If the persona is chatting right now, default to no reaction + # If the role is chatting right now, default to no reaction scratch = role._rc.scratch if scratch.chatting_with: return False @@ -182,7 +206,7 @@ def _should_react(role: "STRole", retrieved: dict, roles: dict): curr_event = retrieved["curr_event"] if ":" not in curr_event.subject: - # this is a persona event. + # this is a role event. if lets_talk(role, roles[curr_event.subject], retrieved): return f"chat with {curr_event.subject}" react_mode = lets_react(role, roles[curr_event.subject], @@ -192,11 +216,11 @@ def _should_react(role: "STRole", retrieved: dict, roles: dict): def _chat_react(maze: Maze, role: STRole, reaction_mode: str, roles: list[STRole]): - # There are two personas -- the persona who is initiating the conversation - # and the persona who is the target. We get the persona instances here. + # There are two roles -- the role who is initiating the conversation + # and the role who is the target. We get the role instances here. init_role = role target_role = roles[reaction_mode[9:].strip()] - curr_personas = [init_role, target_role] + curr_roles = [init_role, target_role] # Actually creating the conversation here. convo, duration_min = generate_convo(maze, init_role, target_role) # 2222 @@ -215,13 +239,13 @@ def _chat_react(maze: Maze, role: STRole, reaction_mode: str, roles: list[STRole for role, p in [("init", init_role), ("target", target_role)]: if role == "init": - act_address = f" {target_role.name}" + act_address = f" {target_role.name}" act_event = (p.name, "chat with", target_role.name) chatting_with = target_role.name chatting_with_buffer = {} chatting_with_buffer[target_role.name] = 800 elif role == "target": - act_address = f" {init_role.name}" + act_address = f" {init_role.name}" act_event = (p.name, "chat with", init_role.name) chatting_with = init_role.name chatting_with_buffer = {} @@ -344,7 +368,7 @@ def generate_convo_summary(role: STRole, conv: list) -> str: def generate_new_decomp_schedule(role: STRole, inserted_act: str, inserted_act_dur: int, start_hour: int, end_hour: int): # Step 1: Setting up the core variables for the function. - #

is the persona whose schedule we are editing right now. + #

is the role whose schedule we are editing right now. p = role scratch = role._rc.scratch # indicates the number of minutes that have passed today. @@ -381,7 +405,7 @@ def generate_new_decomp_schedule(role: STRole, inserted_act: str, inserted_act_d dur_sum += dur count += 1 - persona_name = role.name + role_name = role.name main_act_dur = main_act_dur x = truncated_act_dur[-1][0].split("(")[0].strip() + " (on the way to " + truncated_act_dur[-1][0].split("(")[-1][ @@ -407,3 +431,218 @@ def generate_new_decomp_schedule(role: STRole, inserted_act: str, inserted_act_d end_time_hour, inserted_act, inserted_act_dur) + + +def _long_term_planning(role: STRole, new_day: bool): + """ + Formulates the role's daily long-term plan if it is the start of a new + day. This basically has two components: first, we create the wake-up hour, + and second, we create the hourly schedule based on it. + INPUT + new_day: Indicates whether the current time signals a "First day", + "New day", or False (for neither). This is important because we + create the roles' long term planning on the new day. + """ + # We start by creating the wake up hour for the role. + wake_up_hour = WakeUp().run(role) + + # When it is a new day, we start by creating the daily_req of the role. + # Note that the daily_req is a list of strings that describe the role's + # day in broad strokes. + if new_day == "First day": + # Bootstrapping the daily plan for the start of then generation: + # if this is the start of generation (so there is no previous day's + # daily requirement, or if we are on a new day, we want to create a new + # set of daily requirements. + role.scratch.daily_req = GenDailySchedule().run(role, + wake_up_hour) + elif new_day == "New day": + revise_identity(role) + + # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - TODO + # We need to create a new daily_req here... + role.scratch.daily_req = role.scratch.daily_req + + # Based on the daily_req, we create an hourly schedule for the role, + # which is a list of todo items with a time duration (in minutes) that + # add up to 24 hours. + role.scratch.f_daily_schedule = GenHourlySchedule().run(role, + wake_up_hour) + role.scratch.f_daily_schedule_hourly_org = (role.scratch + .f_daily_schedule[:]) + + + # Added March 4 -- adding plan to the memory. + thought = f"This is {role.scratch.name}'s plan for {role.scratch.curr_time.strftime('%A %B %d')}:" + for i in role.scratch.daily_req: + thought += f" {i}," + thought = thought[:-1] + "." + created = role.scratch.curr_time + expiration = role.scratch.curr_time + datetime.timedelta(days=30) + s, p, o = (role.scratch.name, "plan", role.scratch.curr_time.strftime('%A %B %d')) + keywords = set(["plan"]) + thought_poignancy = 5 + thought_embedding_pair = (thought, get_embedding(thought)) + role.a_mem.add_thought(created, expiration, s, p, o, + thought, keywords, thought_poignancy, + thought_embedding_pair, None) + + # print("Sleeping for 20 seconds...") + # time.sleep(10) + # print("Done sleeping!") + + +def _determine_action(role: STRole, maze: Maze): + """ + Creates the next action sequence for the role. + The main goal of this function is to run "add_new_action" on the role's + scratch space, which sets up all the action related variables for the next + action. + As a part of this, the role may need to decompose its hourly schedule as + needed. + INPUT + role: Current instance whose action we are determining. + maze: Current instance. + """ + def determine_decomp(act_desp, act_dura): + """ + Given an action description and its duration, we determine whether we need + to decompose it. If the action is about the agent sleeping, we generally + do not want to decompose it, so that's what we catch here. + + INPUT: + act_desp: the description of the action (e.g., "sleeping") + act_dura: the duration of the action in minutes. + OUTPUT: + a boolean. True if we need to decompose, False otherwise. + """ + if "sleep" not in act_desp and "bed" not in act_desp: + return True + elif "sleeping" in act_desp or "asleep" in act_desp or "in bed" in act_desp: + return False + elif "sleep" in act_desp or "bed" in act_desp: + if act_dura > 60: + return False + return True + + # The goal of this function is to get us the action associated with + # . As a part of this, we may need to decompose some large + # chunk actions. + # Importantly, we try to decompose at least two hours worth of schedule at + # any given point. + curr_index = role.scratch.get_f_daily_schedule_index() + curr_index_60 = role.scratch.get_f_daily_schedule_index(advance=60) + + # * Decompose * + # During the first hour of the day, we need to decompose two hours + # sequence. We do that here. + if curr_index == 0: + # This portion is invoked if it is the first hour of the day. + act_desp, act_dura = role.scratch.f_daily_schedule[curr_index] + if act_dura >= 60: + # We decompose if the next action is longer than an hour, and fits the + # criteria described in determine_decomp. + if determine_decomp(act_desp, act_dura): + role.scratch.f_daily_schedule[curr_index:curr_index+1] = ( + TaskDecomp().run(role, act_desp, act_dura)) + if curr_index_60 + 1 < len(role.scratch.f_daily_schedule): + act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60+1] + if act_dura >= 60: + if determine_decomp(act_desp, act_dura): + role.scratch.f_daily_schedule[curr_index_60+1:curr_index_60+2] = ( + TaskDecomp().run(role, act_desp, act_dura)) + + if curr_index_60 < len(role.scratch.f_daily_schedule): + # If it is not the first hour of the day, this is always invoked (it is + # also invoked during the first hour of the day -- to double up so we can + # decompose two hours in one go). Of course, we need to have something to + # decompose as well, so we check for that too. + if role.scratch.curr_time.hour < 23: + # And we don't want to decompose after 11 pm. + act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60] + if act_dura >= 60: + if determine_decomp(act_desp, act_dura): + role.scratch.f_daily_schedule[curr_index_60:curr_index_60+1] = ( + TaskDecomp().run(role, act_desp, act_dura)) + # * End of Decompose * + + # Generate an instance from the action description and duration. By + # this point, we assume that all the relevant actions are decomposed and + # ready in f_daily_schedule. + print ("DEBUG LJSDLFSKJF") + for i in role.scratch.f_daily_schedule: print (i) + print (curr_index) + print (len(role.scratch.f_daily_schedule)) + print (role.scratch.name) + print ("------") + + # 1440 + x_emergency = 0 + for i in role.scratch.f_daily_schedule: + x_emergency += i[1] + # print ("x_emergency", x_emergency) + + if 1440 - x_emergency > 0: + print ("x_emergency__AAA", x_emergency) + role.scratch.f_daily_schedule += [["sleeping", 1440 - x_emergency]] + + act_desp, act_dura = role.scratch.f_daily_schedule[curr_index] + + new_action_details = GenActionDetails().run(role, act_desp, act_dura) + # Adding the action to role's queue. + role.scratch.add_new_action(**new_action_details) + + +def revise_identity(role: STRole): + p_name = role.scratch.name + + focal_points = [f"{p_name}'s plan for {role.scratch.get_str_curr_date_str()}.", + f"Important recent events for {p_name}'s life."] + retrieved = new_retrieve(role, focal_points) + + statements = "[Statements]\n" + for key, val in retrieved.items(): + for i in val: + statements += f"{i.created.strftime('%A %B %d -- %H:%M %p')}: {i.embedding_key}\n" + + # print (";adjhfno;asdjao;idfjo;af", p_name) + plan_prompt = statements + "\n" + plan_prompt += f"Given the statements above, is there anything that {p_name} should remember as they plan for" + plan_prompt += f" *{role.scratch.curr_time.strftime('%A %B %d')}*? " + plan_prompt += f"If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement)\n\n" + plan_prompt += f"Write the response from {p_name}'s perspective." + plan_note = LLM().ask(plan_prompt) + # print (plan_note) + + thought_prompt = statements + "\n" + thought_prompt += f"Given the statements above, how might we summarize {p_name}'s feelings about their days up to now?\n\n" + thought_prompt += f"Write the response from {p_name}'s perspective." + thought_note = LLM().ask(thought_prompt) + # print (thought_note) + + currently_prompt = f"{p_name}'s status from {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n" + currently_prompt += f"{role.scratch.currently}\n\n" + currently_prompt += f"{p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n" + currently_prompt += (plan_note + thought_note).replace('\n', '') + "\n\n" + currently_prompt += f"It is now {role.scratch.curr_time.strftime('%A %B %d')}. Given the above, write {p_name}'s status for {role.scratch.curr_time.strftime('%A %B %d')} that reflects {p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}. Write this in third-person talking about {p_name}." + currently_prompt += f"If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement).\n\n" + currently_prompt += "Follow this format below:\nStatus: " + # print ("DEBUG ;adjhfno;asdjao;asdfsidfjo;af", p_name) + # print (currently_prompt) + new_currently = LLM().ask(currently_prompt) + # print (new_currently) + # print (new_currently[10:]) + + role.scratch.currently = new_currently + + daily_req_prompt = role.scratch.get_str_iss() + "\n" + daily_req_prompt += f"Today is {role.scratch.curr_time.strftime('%A %B %d')}. Here is {role.scratch.name}'s plan today in broad-strokes (with the time of the day. e.g., have a lunch at 12:00 pm, watch TV from 7 to 8 pm).\n\n" + daily_req_prompt += f"Follow this format (the list should have 4~6 items but no more):\n" + daily_req_prompt += f"1. wake up and complete the morning routine at