From 9aa6417673695bc95108037050e8e14523bb0094 Mon Sep 17 00:00:00 2001 From: better629 Date: Sun, 1 Oct 2023 21:46:38 +0800 Subject: [PATCH] add plan&observe actions --- examples/st_game/actions/decide_to_talk.py | 1 + .../st_game/actions/new_decomp_schedule.py | 153 +++++++ examples/st_game/actions/st_action.py | 5 +- examples/st_game/actions/summarize_conv.py | 40 +- examples/st_game/memory/agent_memory.py | 8 + examples/st_game/memory/retrieve.py | 116 +++++ examples/st_game/plan/st_plan.py | 406 ++++++++++++++++++ examples/st_game/roles/st_role.py | 4 + examples/st_game/utils/const.py | 3 +- examples/st_game/utils/utils.py | 23 +- 10 files changed, 752 insertions(+), 7 deletions(-) create mode 100644 examples/st_game/actions/new_decomp_schedule.py diff --git a/examples/st_game/actions/decide_to_talk.py b/examples/st_game/actions/decide_to_talk.py index fb1c625b1..77c3703f3 100644 --- a/examples/st_game/actions/decide_to_talk.py +++ b/examples/st_game/actions/decide_to_talk.py @@ -93,6 +93,7 @@ class DecideToTalk(STAction): prompt_input = create_prompt_input(init_role, target_role, retrieved) prompt = self.generate_prompt_with_tmpl_filename(prompt_input=prompt_input, tmpl_filename="decide_to_talk_v2.txt") + self.fail_default_resp = self._func_fail_default_resp() output = await self._run_v1(prompt) # yes or no result = True if output == "yes" else False logger.info(f"Run action: {self.__class__.__name__} with result: {result}") diff --git a/examples/st_game/actions/new_decomp_schedule.py b/examples/st_game/actions/new_decomp_schedule.py new file mode 100644 index 000000000..ac1003d9e --- /dev/null +++ b/examples/st_game/actions/new_decomp_schedule.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : new_decomp_schedule + +import datetime + +from metagpt.logs import logger +from metagpt.schema import Message + +from ..roles.st_role import STRole +from ..actions.st_action import STAction + + +class NewDecompSchedule(STAction): + + def __init__(self, name="NewDecompSchedule", context: list[Message] = None, llm=None): + super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + resp = False + try: + llm_resp = self._func_cleanup(llm_resp, prompt) + dur_sum = 0 + for act, dur in llm_resp: + dur_sum += dur + if isinstance(act, str): + return False + if isinstance(dur, int): + return False + x = prompt.split("\n")[0].split("originally planned schedule from")[-1].strip()[:-1] + x = [datetime.datetime.strptime(i.strip(), "%H:%M %p") for i in x.split(" to ")] + delta_min = int((x[1] - x[0]).total_seconds() / 60) + + if int(dur_sum) != int(delta_min): + return False + except Exception as exp: + pass + return resp + + def _func_cleanup(self, llm_resp: str, prompt: str) -> list: + new_schedule = prompt + " " + llm_resp.strip() + new_schedule = new_schedule.split("The revised schedule:")[-1].strip() + new_schedule = new_schedule.split("\n") + + ret_temp = [] + for i in new_schedule: + ret_temp += [i.split(" -- ")] + + ret = [] + for time_str, action in ret_temp: + start_time = time_str.split(" ~ ")[0].strip() + end_time = time_str.split(" ~ ")[1].strip() + delta = datetime.datetime.strptime(end_time, "%H:%M") - datetime.datetime.strptime(start_time, "%H:%M") + delta_min = int(delta.total_seconds() / 60) + if delta_min < 0: + delta_min = 0 + ret += [[action, delta_min]] + + return ret + + def _func_fail_default_resp(self, main_act_dur: int, truncated_act_dur: int) -> int: + dur_sum = 0 + for act, dur in main_act_dur: + dur_sum += dur + + ret = truncated_act_dur[:] + ret += main_act_dur[len(ret) - 1:] + + # If there are access, we need to trim... + ret_dur_sum = 0 + count = 0 + over = None + for act, dur in ret: + ret_dur_sum += dur + if ret_dur_sum == dur_sum: + break + if ret_dur_sum > dur_sum: + over = ret_dur_sum - dur_sum + break + count += 1 + + if over: + ret = ret[:count + 1] + ret[-1][1] -= over + + return ret + + async def run(self, + role: STRole, + main_act_dur: int, + truncated_act_dur: int, + start_time_hour: datetime, + end_time_hour: datetime, + inserted_act: str, + inserted_act_dur: int, + *args, **kwargs): + + def create_prompt_input(role: STRole, + main_act_dur: int, + truncated_act_dur: int, + start_time_hour: datetime, + end_time_hour: datetime, + inserted_act: str, + inserted_act_dur: int): + persona_name = role.name + start_hour_str = start_time_hour.strftime("%H:%M %p") + end_hour_str = end_time_hour.strftime("%H:%M %p") + + original_plan = "" + for_time = start_time_hour + for i in main_act_dur: + original_plan += f'{for_time.strftime("%H:%M")} ~ {(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + \ + i[0] + original_plan += "\n" + for_time += datetime.timedelta(minutes=int(i[1])) + + new_plan_init = "" + for_time = start_time_hour + for count, i in enumerate(truncated_act_dur): + new_plan_init += f'{for_time.strftime("%H:%M")} ~ {(for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M")} -- ' + \ + i[0] + new_plan_init += "\n" + if count < len(truncated_act_dur) - 1: + for_time += datetime.timedelta(minutes=int(i[1])) + + new_plan_init += (for_time + datetime.timedelta(minutes=int(i[1]))).strftime("%H:%M") + " ~" + + prompt_input = [persona_name, + start_hour_str, + end_hour_str, + original_plan, + persona_name, + inserted_act, + inserted_act_dur, + persona_name, + start_hour_str, + end_hour_str, + end_hour_str, + new_plan_init] + return prompt_input + + prompt_input = create_prompt_input(role, + main_act_dur, + truncated_act_dur, + start_time_hour, + end_time_hour, + inserted_act, + inserted_act_dur) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, + "new_decomp_schedule_v1.txt") + self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur) + output = await self._run_v1(prompt) + return output diff --git a/examples/st_game/actions/st_action.py b/examples/st_game/actions/st_action.py index 9d319c80f..9cbf3cee6 100644 --- a/examples/st_game/actions/st_action.py +++ b/examples/st_game/actions/st_action.py @@ -17,6 +17,7 @@ class STAction(Action): def __init__(self, name="STAction", context: list[Message] = None, llm=None): super().__init__(name, context, llm) self.prompt_dir = PROMPTS_DIR + self.fail_default_resp = None @abstractmethod def _func_validate(self, llm_resp: str, prompt: str): @@ -54,14 +55,14 @@ class STAction(Action): async def _run_v1(self, prompt: str, retry: int = 3) -> str: """ - same with `gpt_structure.generate_prompt` + same with `gpt_structure.safe_generate_response` default post-preprocess operations of LLM response """ for idx in range(retry): llm_resp = await self._aask(prompt) if self._func_validate(llm_resp, prompt): return self._func_cleanup(llm_resp, prompt) - return self._func_fail_default_resp() + return self.fail_default_resp # TODO fix async def _run_v2(self, prompt: str, diff --git a/examples/st_game/actions/summarize_conv.py b/examples/st_game/actions/summarize_conv.py index f5561fcb1..b396fd506 100644 --- a/examples/st_game/actions/summarize_conv.py +++ b/examples/st_game/actions/summarize_conv.py @@ -2,11 +2,47 @@ # -*- coding: utf-8 -*- # @Desc : summarize the content of agents' conversation -from metagpt.actions.action import Action from metagpt.schema import Message +from ..actions.st_action import STAction -class SummarizeConv(Action): + +class SummarizeConv(STAction): def __init__(self, name="SummarizeConv", context: list[Message] = None, llm=None): super().__init__(name, context, llm) + + def _func_validate(self, llm_resp: str, prompt: str) -> bool: + resp = False + try: + _ = self._func_cleanup(llm_resp, prompt) + resp = True + except Exception as exp: + pass + return resp + + def _func_cleanup(self, llm_resp: str, prompt: str) -> str: + ret = "conversing about " + llm_resp.strip() + return ret + + def _func_fail_default_resp(self) -> str: + return "conversing with a housemate about morning greetings" + + async def run(self, conv: list): + def create_prompt_input(conversation: list): + convo_str = "" + for row in conversation: + convo_str += f'{row[0]}: "{row[1]}"\n' + prompt_input = [convo_str] + return prompt_input + + prompt_input = create_prompt_input(conv) + prompt = self.generate_prompt_with_tmpl_filename(prompt_input, + "summarize_conversation_v1.txt") + + example_output = "conversing about what to eat for lunch" + special_instruction = "The output must continue the sentence above by filling in the tag. " \ + "Don't start with 'this is a conversation about...' Just finish the sentence " \ + "but do not miss any important details (including who are chatting)." + output = await self._run_v2(prompt, example_output, special_instruction) + return output diff --git a/examples/st_game/memory/agent_memory.py b/examples/st_game/memory/agent_memory.py index ff93965dc..46ee3cb0e 100644 --- a/examples/st_game/memory/agent_memory.py +++ b/examples/st_game/memory/agent_memory.py @@ -100,9 +100,11 @@ class AgentMemory(Memory): @李嵩@张凯 这里的storage是List,你们需要写一个JSON转化器,将List修改为node.json一致的格式 """ super.__init__() + self.id_to_node = dict() # TODO jiayi add self.storage: list[BasicMemory] = [] # 重写Stroage,存储BasicMemory所有节点 self.event_list = [] # 存储event记忆 self.thought_list = [] # 存储thought记忆 + self.chat_list = [] # chat-related memory self.event_keywords = dict() # 存储keywords self.thought_keywords = dict() @@ -322,3 +324,9 @@ class AgentMemory(Memory): for e_node in self.event_list[:retention]: ret_set.add(e_node.summary()) return ret_set + + def get_last_chat(self, target_role_name: str) -> str: + if target_role_name.lower() in self.chat_keywords: + return self.chat_keywords[target_role_name.lower()][0] + else: + return False diff --git a/examples/st_game/memory/retrieve.py b/examples/st_game/memory/retrieve.py index 6ff507037..e41145812 100644 --- a/examples/st_game/memory/retrieve.py +++ b/examples/st_game/memory/retrieve.py @@ -3,8 +3,11 @@ # @Desc : Retrieve函数实现 import datetime +from typing import Union + from numpy import dot from numpy.linalg import norm + from ..memory.agent_memory import AgentMemory, BasicMemory from ..utils.utils import get_embedding @@ -141,3 +144,116 @@ def normalize_score_floats(score_list, target_min, target_max): score_list[i]['recency'] = recency_list[i] return score_list + + +def normalize_dict_floats(d: dict, target_min: Union[int, float], target_max: Union[int, float]) -> dict: + """ + This function normalizes the float values of a given dictionary 'd' between + a target minimum and maximum value. The normalization is done by scaling the + values to the target range while maintaining the same relative proportions + between the original values. + + INPUT: + d: Dictionary. The input dictionary whose float values need to be + normalized. + target_min: Integer or float. The minimum value to which the original + values should be scaled. + target_max: Integer or float. The maximum value to which the original + values should be scaled. + OUTPUT: + d: A new dictionary with the same keys as the input but with the float + values normalized between the target_min and target_max. + + Example input: + d = {'a':1.2,'b':3.4,'c':5.6,'d':7.8} + target_min = -5 + target_max = 5 + """ + min_val = min(val for val in d.values()) + max_val = max(val for val in d.values()) + range_val = max_val - min_val + + if range_val == 0: + for key, val in d.items(): + d[key] = (target_max - target_min) / 2 + else: + for key, val in d.items(): + d[key] = ((val - min_val) * (target_max - target_min) + / range_val + target_min) + return d + + +def new_retrieve(role, focal_points, n_count=30): + """ + Given the current role and focal points (focal points are events or + thoughts for which we are retrieving), we retrieve a set of nodes for each + of the focal points and return a dictionary. + + INPUT: + role: The current role object whose memory we are retrieving. + focal_points: A list of focal points (string description of the events or + thoughts that is the focus of current retrieval). + OUTPUT: + retrieved: A dictionary whose keys are a string focal point, and whose + values are a list of Node object in the agent's associative + memory. + + Example input: + role = object + focal_points = ["How are you?", "Jane is swimming in the pond"] + """ + # is the main dictionary that we are returning + retrieved = dict() + for focal_pt in focal_points: + scratch = role._rc.scratch + # Getting all nodes from the agent's memory (both thoughts and events) and + # sorting them by the datetime of creation. + # You could also imagine getting the raw conversation, but for now. + nodes = [[i.last_accessed, i] + for i in role._rc.memory.event_list + role._rc.memory.thought_list + if "idle" not in i.embedding_key] + nodes = sorted(nodes, key=lambda x: x[0]) + nodes = [i for created, i in nodes] + + # Calculating the component dictionaries and normalizing them. + recency_out = extract_recency(role, nodes) # TODO + recency_out = normalize_dict_floats(recency_out, 0, 1) + importance_out = extract_importance(role, nodes) + importance_out = normalize_dict_floats(importance_out, 0, 1) + relevance_out = extract_relevance(role, nodes, focal_pt) + relevance_out = normalize_dict_floats(relevance_out, 0, 1) + + # Computing the final scores that combines the component values. + # Note to self: test out different weights. [1, 1, 1] tends to work + # decently, but in the future, these weights should likely be learned, + # perhaps through an RL-like process. + # gw = [1, 1, 1] + # gw = [1, 2, 1] + gw = [0.5, 3, 2] + master_out = dict() + for key in recency_out.keys(): + master_out[key] = (scratch.recency_w * recency_out[key] * gw[0] + + scratch.relevance_w * relevance_out[key] * gw[1] + + scratch.importance_w * importance_out[key] * gw[2]) + + master_out = top_highest_x_values(master_out, len(master_out.keys())) + for key, val in master_out.items(): + print(role._rc.memory.id_to_node[key].embedding_key, val) + print(scratch.recency_w * recency_out[key] * 1, + scratch.relevance_w * relevance_out[key] * 1, + scratch.importance_w * importance_out[key] * 1) + + # Extracting the highest x values. + # has the key of node.id and value of float. Once we get the + # highest x values, we want to translate the node.id into nodes and return + # the list of nodes. + master_out = top_highest_x_values(master_out, n_count) + master_nodes = [role._rc.memory.id_to_node[key] + for key in list(master_out.keys())] + + for n in master_nodes: + n.last_accessed = scratch.curr_time + + retrieved[focal_pt] = master_nodes + + return retrieved diff --git a/examples/st_game/plan/st_plan.py b/examples/st_game/plan/st_plan.py index 27ed0f94f..85e40ed9e 100644 --- a/examples/st_game/plan/st_plan.py +++ b/examples/st_game/plan/st_plan.py @@ -1,3 +1,409 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # @Desc : st' planning execution + +import random +from typing import Union, Tuple +from datetime import datetime +import math + +from ..maze import Maze +from ..plan.converse import agent_conversation +from ..roles.st_role import STRole +from ..actions.decide_to_talk import DecideToTalk +from ..actions.summarize_conv import SummarizeConv +from ..actions.new_decomp_schedule import NewDecompSchedule + + +def plan(role: STRole, maze: Maze, roles: list[STRole], new_day: bool, retrieved: dict): + + focused_event = False + if retrieved.keys(): + focused_event = _choose_retrieved(role, retrieved) + + # Step 2: Once we choose an event, we need to determine whether the + # persona will take any actions for the perceived event. There are + # three possible modes of reaction returned by _should_react. + # a) "chat with {target_persona.name}" + # b) "react" + # c) False + if focused_event: + reaction_mode = _should_react(role, focused_event, roles) + if reaction_mode: + # If we do want to chat, then we generate conversation + if reaction_mode[:9] == "chat with": + _chat_react(maze, role, focused_event, reaction_mode, roles) + elif reaction_mode[:4] == "wait": + _wait_react(role, reaction_mode) + + +def _choose_retrieved(role_name: str, retrieved: dict) -> Union[None, dict]: + """ + Retrieved elements have multiple core "curr_events". We need to choose one + event to which we are going to react to. We pick that event here. + Args: + role_name: Current role instance's name whose action we are determining. + retrieved: A dictionary of that were retrieved from the + the role's associative memory. This dictionary takes the + following form: + dictionary[event.description] = + {["curr_event"] = , + ["events"] = [, ...], + ["thoughts"] = [, ...] } + """ + # Once we are done with the reflection, we might want to build a more + # complex structure here. + + # We do not want to take self events... for now + copy_retrieved = retrieved.copy() + for event_desc, rel_ctx in copy_retrieved.items(): + curr_event = rel_ctx["curr_event"] + if curr_event.subject == role_name: + del retrieved[event_desc] + + # Always choose role first. + priority = [] + for event_desc, rel_ctx in retrieved.items(): + curr_event = rel_ctx["curr_event"] + if (":" not in curr_event.subject + and curr_event.subject != role_name): + priority += [rel_ctx] + if priority: + return random.choice(priority) + + # Skip idle. + for event_desc, rel_ctx in retrieved.items(): + if "is idle" not in event_desc: + priority += [rel_ctx] + if priority: + return random.choice(priority) + return None + + +def _should_react(role: "STRole", retrieved: dict, roles: dict): + """ + Determines what form of reaction the persona should exihibit given the + retrieved values. + INPUT + role: Current instance whose action we are determining. + retrieved: A dictionary of that were retrieved from the + the role's associative memory. This dictionary takes the + following form: + dictionary[event.description] = + {["curr_event"] = , + ["events"] = [, ...], + ["thoughts"] = [, ...] } + roles: A dictionary that contains all role names as keys, and the + instance as values. + """ + + def lets_talk(init_role: STRole, target_role: STRole, retrieved: dict): + scratch = init_role._rc.scratch + target_scratch = target_role._rc.scratch + if (not target_scratch.act_address + or not target_scratch.act_description + or not scratch.act_address + or not scratch.act_description): + return False + + if ("sleeping" in target_scratch.act_description + or "sleeping" in scratch.act_description): + return False + + if scratch.curr_time.hour == 23: + return False + + if "" in target_scratch.act_address: + return False + + if (target_scratch.chatting_with + or scratch.chatting_with): + return False + + if (target_role.name in scratch.chatting_with_buffer): + if scratch.chatting_with_buffer[target_role.name] > 0: + return False + + if DecideToTalk().run(init_role, target_role, retrieved): + return True + + return False + + def lets_react(init_role: STRole, target_role: STRole, retrieved: dict): + scratch = init_role._rc.scratch + target_scratch = target_role._rc.scratch + if (not target_scratch.act_address + or not target_scratch.act_description + or not scratch.act_address + or not scratch.act_description): + return False + + if ("sleeping" in target_scratch.act_description + or "sleeping" in scratch.act_description): + return False + + # return False + if scratch.curr_time.hour == 23: + return False + + if "waiting" in target_scratch.act_description: + return False + if scratch.planned_path == []: + return False + + if (scratch.act_address + != target_scratch.act_address): + return False + + react_mode = DecideToTalk().run(init_role, + target_role, + retrieved) + + if react_mode == "1": + wait_until = ((target_scratch.act_start_time + + datetime.timedelta(minutes=target_scratch.act_duration - 1)) + .strftime("%B %d, %Y, %H:%M:%S")) + return f"wait: {wait_until}" + elif react_mode == "2": + return False + return "do other things" + else: + return False # "keep" + + # If the persona is chatting right now, default to no reaction + scratch = role._rc.scratch + if scratch.chatting_with: + return False + if "" in scratch.act_address: + return False + + # Recall that retrieved takes the following form: + # dictionary {["curr_event"] = } + curr_event = retrieved["curr_event"] + + if ":" not in curr_event.subject: + # this is a persona event. + if lets_talk(role, roles[curr_event.subject], retrieved): + return f"chat with {curr_event.subject}" + react_mode = lets_react(role, roles[curr_event.subject], + retrieved) + return react_mode + return False + + +def _chat_react(maze: Maze, role: STRole, reaction_mode: str, roles: list[STRole]): + # There are two personas -- the persona who is initiating the conversation + # and the persona who is the target. We get the persona instances here. + init_role = role + target_role = roles[reaction_mode[9:].strip()] + curr_personas = [init_role, target_role] + + # Actually creating the conversation here. + convo, duration_min = generate_convo(maze, init_role, target_role) # 2222 + convo_summary = generate_convo_summary(init_role, convo) + inserted_act = convo_summary + inserted_act_dur = duration_min + + act_start_time = target_role._rc.scratch.act_start_time + + curr_time = target_role._rc.scratch.curr_time + if curr_time.second != 0: + temp_curr_time = curr_time + datetime.timedelta(seconds=60 - curr_time.second) + chatting_end_time = temp_curr_time + datetime.timedelta(minutes=inserted_act_dur) + else: + chatting_end_time = curr_time + datetime.timedelta(minutes=inserted_act_dur) + + for role, p in [("init", init_role), ("target", target_role)]: + if role == "init": + act_address = f" {target_role.name}" + act_event = (p.name, "chat with", target_role.name) + chatting_with = target_role.name + chatting_with_buffer = {} + chatting_with_buffer[target_role.name] = 800 + elif role == "target": + act_address = f" {init_role.name}" + act_event = (p.name, "chat with", init_role.name) + chatting_with = init_role.name + chatting_with_buffer = {} + chatting_with_buffer[init_role.name] = 800 + + act_pronunciatio = "💬" + act_obj_description = None + act_obj_pronunciatio = None + act_obj_event = (None, None, None) + + _create_react(p, inserted_act, inserted_act_dur, + act_address, act_event, chatting_with, convo, chatting_with_buffer, chatting_end_time, + act_pronunciatio, act_obj_description, act_obj_pronunciatio, + act_obj_event, act_start_time) + + +def _create_react(role: STRole, inserted_act: str, inserted_act_dur: int, + act_address: str, act_event: Tuple, chatting_with: str, chat: list, chatting_with_buffer: dict, + chatting_end_time: datetime, + act_pronunciatio: str, act_obj_description: str, act_obj_pronunciatio: str, + act_obj_event: Tuple, act_start_time=None): + p = role + scratch = role._rc.scratch + + min_sum = 0 + for i in range(scratch.get_f_daily_schedule_hourly_org_index()): + min_sum += scratch.f_daily_schedule_hourly_org[i][1] + start_hour = int(min_sum / 60) + + if (scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] >= 120): + end_hour = start_hour + \ + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] / 60 + + elif (scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] + + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][1]): + end_hour = start_hour + ( + (scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] + + scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][ + 1]) / 60) + + else: + end_hour = start_hour + 2 + end_hour = int(end_hour) + + dur_sum = 0 + count = 0 + start_index = None + end_index = None + for act, dur in scratch.f_daily_schedule: + if dur_sum >= start_hour * 60 and start_index is None: + start_index = count + if dur_sum >= end_hour * 60 and end_index is None: + end_index = count + dur_sum += dur + count += 1 + + ret = generate_new_decomp_schedule(p, inserted_act, inserted_act_dur, + start_hour, end_hour) + scratch.f_daily_schedule[start_index:end_index] = ret + scratch.add_new_action(act_address, + inserted_act_dur, + inserted_act, + act_pronunciatio, + act_event, + chatting_with, + chat, + chatting_with_buffer, + chatting_end_time, + act_obj_description, + act_obj_pronunciatio, + act_obj_event, + act_start_time) + + +def _wait_react(role: STRole, reaction_mode: str): + scratch = role._rc.scratch + + inserted_act = f'waiting to start {scratch.act_description.split("(")[-1][:-1]}' + end_time = datetime.datetime.strptime(reaction_mode[6:].strip(), "%B %d, %Y, %H:%M:%S") + inserted_act_dur = (end_time.minute + end_time.hour * 60) - ( + scratch.curr_time.minute + scratch.curr_time.hour * 60) + 1 + + act_address = f" {scratch.curr_tile[0]} {scratch.curr_tile[1]}" + act_event = (role.name, "waiting to start", scratch.act_description.split("(")[-1][:-1]) + chatting_with = None + chat = None + chatting_with_buffer = None + chatting_end_time = None + + act_pronunciatio = "⌛" + act_obj_description = None + act_obj_pronunciatio = None + act_obj_event = (None, None, None) + + _create_react(role, inserted_act, inserted_act_dur, + act_address, act_event, chatting_with, chat, chatting_with_buffer, chatting_end_time, + act_pronunciatio, act_obj_description, act_obj_pronunciatio, act_obj_event) + + +def generate_convo(maze: Maze, init_role: STRole, target_role: STRole) -> Union[list, int]: + curr_loc = maze.access_tile(init_role._rc.scratch.curr_tile) + convo = agent_conversation(maze, init_role, target_role) + all_utt = "" + + for row in convo: + speaker = row[0] + utt = row[1] + all_utt += f"{speaker}: {utt}\n" + + convo_length = math.ceil(int(len(all_utt) / 8) / 30) + + return convo, convo_length + + +def generate_convo_summary(role: STRole, conv: list) -> str: + conv_summary = SummarizeConv().run(conv) + return conv_summary + + +def generate_new_decomp_schedule(role: STRole, inserted_act: str, inserted_act_dur: int, + start_hour: int, end_hour: int): + # Step 1: Setting up the core variables for the function. + #

is the persona whose schedule we are editing right now. + p = role + scratch = role._rc.scratch + # indicates the number of minutes that have passed today. + today_min_pass = (int(scratch.curr_time.hour) * 60 + + int(scratch.curr_time.minute) + 1) + + # Step 2: We need to create and . + main_act_dur = [] + truncated_act_dur = [] + dur_sum = 0 # duration sum + count = 0 # enumerate count + truncated_fin = False + + print("DEBUG::: ", scratch.name) + for act, dur in scratch.f_daily_schedule: + if (dur_sum >= start_hour * 60) and (dur_sum < end_hour * 60): + main_act_dur += [[act, dur]] + if dur_sum <= today_min_pass: + truncated_act_dur += [[act, dur]] + elif dur_sum > today_min_pass and not truncated_fin: + # We need to insert that last act, duration list like this one: + # e.g., ['wakes up and completes her morning routine (wakes up...)', 2] + truncated_act_dur += [[scratch.f_daily_schedule[count][0], + dur_sum - today_min_pass]] + truncated_act_dur[-1][-1] -= ( + dur_sum - today_min_pass) # DEC 7 DEBUG;.. is the +1 the right thing to do??? + # DEC 7 DEBUG;.. is the +1 the right thing to do??? + # truncated_act_dur[-1][-1] -= (dur_sum - today_min_pass + 1) + print("DEBUG::: ", truncated_act_dur) + + # DEC 7 DEBUG;.. is the +1 the right thing to do??? + # truncated_act_dur[-1][-1] -= (dur_sum - today_min_pass) + truncated_fin = True + dur_sum += dur + count += 1 + + persona_name = role.name + main_act_dur = main_act_dur + + x = truncated_act_dur[-1][0].split("(")[0].strip() + " (on the way to " + truncated_act_dur[-1][0].split("(")[-1][ + :-1] + ")" + truncated_act_dur[-1][0] = x + + if "(" in truncated_act_dur[-1][0]: + inserted_act = truncated_act_dur[-1][0].split("(")[0].strip() + " (" + inserted_act + ")" + + # To do inserted_act_dur+1 below is an important decision but I'm not sure + # if I understand the full extent of its implications. Might want to + # revisit. + truncated_act_dur += [[inserted_act, inserted_act_dur]] + start_time_hour = (datetime.datetime(2022, 10, 31, 0, 0) + + datetime.timedelta(hours=start_hour)) + end_time_hour = (datetime.datetime(2022, 10, 31, 0, 0) + + datetime.timedelta(hours=end_hour)) + + return NewDecompSchedule().run(role, + main_act_dur, + truncated_act_dur, + start_time_hour, + end_time_hour, + inserted_act, + inserted_act_dur) diff --git a/examples/st_game/roles/st_role.py b/examples/st_game/roles/st_role.py index e4de84afa..643853063 100644 --- a/examples/st_game/roles/st_role.py +++ b/examples/st_game/roles/st_role.py @@ -59,6 +59,10 @@ class STRole(Role): else: self._watch([DummyAction]) + @property + def name(self): + return self._setting.name + def load_from(self, folder: Path): """ load role data from `storage/{simulation_name}/personas/{role_name} diff --git a/examples/st_game/utils/const.py b/examples/st_game/utils/const.py index 8fa4767ed..e4dc38e55 100644 --- a/examples/st_game/utils/const.py +++ b/examples/st_game/utils/const.py @@ -7,5 +7,4 @@ from pathlib import Path ROOT_PATH = Path(__file__).parent.parent STORAGE_PATH = ROOT_PATH.joinpath("storage") MAZE_ASSET_PATH = ROOT_PATH.joinpath("static_dirs/assets/the_ville") - - +PROMPTS_DIR = ROOT_PATH.joinpath("prompts") diff --git a/examples/st_game/utils/utils.py b/examples/st_game/utils/utils.py index 5cd110e9f..799d9d348 100644 --- a/examples/st_game/utils/utils.py +++ b/examples/st_game/utils/utils.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # @Desc : utils -from typing import Any +from typing import Any, Union import json import openai from pathlib import Path @@ -71,3 +71,24 @@ def generate_poig_score(scratch, event_type, description): return get_poignancy_action(scratch, description)[0] elif event_type == "chat": return get_poignancy_chat(scratch, description)[0] + + +def extract_first_json_dict(data_str: str) -> Union[None, dict]: + # Find the first occurrence of a JSON object within the string + start_idx = data_str.find("{") + end_idx = data_str.find("}", start_idx) + 1 + + # Check if both start and end indices were found + if start_idx == -1 or end_idx == 0: + return None + + # Extract the first JSON dictionary + json_str = data_str[start_idx:end_idx] + + try: + # Attempt to parse the JSON data + json_dict = json.loads(json_str) + return json_dict + except json.JSONDecodeError: + # If parsing fails, return None + return None