diff --git a/examples/st_game/actions/agent_chat_sum_rel.py b/examples/st_game/actions/agent_chat_sum_rel.py
index de005f8f4..9a26ccf02 100644
--- a/examples/st_game/actions/agent_chat_sum_rel.py
+++ b/examples/st_game/actions/agent_chat_sum_rel.py
@@ -29,7 +29,7 @@ class AgentChatSumRel(STAction):
def _func_fail_default_resp(self) -> str:
pass
- async def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
+ def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
def create_prompt_input(init_role: STRole, target_role: STRole, statements: str) -> str:
prompt_input = [statements, init_role.name, target_role.name]
return prompt_input
@@ -40,7 +40,7 @@ class AgentChatSumRel(STAction):
example_output = "Jane Doe is working on a project"
special_instruction = "The output should be a string that responds to the question."
- output = await self._run_v2(prompt,
- example_output,
- special_instruction)
+ output = self._run_v2(prompt,
+ example_output,
+ special_instruction)
return output[0]
diff --git a/examples/st_game/actions/decide_to_talk.py b/examples/st_game/actions/decide_to_talk.py
index 77c3703f3..5a3452d98 100644
--- a/examples/st_game/actions/decide_to_talk.py
+++ b/examples/st_game/actions/decide_to_talk.py
@@ -29,7 +29,7 @@ class DecideToTalk(STAction):
def _func_fail_default_resp(self) -> str:
return "yes"
- async def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
+ def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
"""Run action"""
def create_prompt_input(init_role: STRole, target_role: STRole, retrieved: dict) -> str:
scratch = init_role._rc.scratch
@@ -94,7 +94,7 @@ class DecideToTalk(STAction):
prompt = self.generate_prompt_with_tmpl_filename(prompt_input=prompt_input,
tmpl_filename="decide_to_talk_v2.txt")
self.fail_default_resp = self._func_fail_default_resp()
- output = await self._run_v1(prompt) # yes or no
+ output = self._run_v1(prompt) # yes or no
result = True if output == "yes" else False
logger.info(f"Run action: {self.__class__.__name__} with result: {result}")
return result
diff --git a/examples/st_game/actions/gen_iter_chat_utt.py b/examples/st_game/actions/gen_iter_chat_utt.py
index 02a96191f..ff4728ad8 100644
--- a/examples/st_game/actions/gen_iter_chat_utt.py
+++ b/examples/st_game/actions/gen_iter_chat_utt.py
@@ -45,8 +45,8 @@ class GenIterChatUTT(STAction):
cleaned_dict["end"] = False
return cleaned_dict
- async def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
- curr_chat: list[str], *args, **kwargs) -> dict:
+ def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
+ curr_chat: list[str], *args, **kwargs) -> dict:
def create_prompt_input(maze: Maze, init_role: STRole, target_role: STRole,
retrieved: dict, curr_context: str, curr_chat: list[str]):
role = init_role
@@ -97,5 +97,5 @@ class GenIterChatUTT(STAction):
"iterative_convo_v1.txt")
# original using `ChatGPT_safe_generate_response_OLD`
self.fail_default_resp = self._func_fail_default_resp()
- output = await self._run_v1(prompt)
+ output = self._run_v1(prompt)
return output
diff --git a/examples/st_game/actions/new_decomp_schedule.py b/examples/st_game/actions/new_decomp_schedule.py
index ac1003d9e..bf7bbeeff 100644
--- a/examples/st_game/actions/new_decomp_schedule.py
+++ b/examples/st_game/actions/new_decomp_schedule.py
@@ -85,15 +85,15 @@ class NewDecompSchedule(STAction):
return ret
- async def run(self,
- role: STRole,
- main_act_dur: int,
- truncated_act_dur: int,
- start_time_hour: datetime,
- end_time_hour: datetime,
- inserted_act: str,
- inserted_act_dur: int,
- *args, **kwargs):
+ def run(self,
+ role: STRole,
+ main_act_dur: int,
+ truncated_act_dur: int,
+ start_time_hour: datetime,
+ end_time_hour: datetime,
+ inserted_act: str,
+ inserted_act_dur: int,
+ *args, **kwargs):
def create_prompt_input(role: STRole,
main_act_dur: int,
@@ -149,5 +149,5 @@ class NewDecompSchedule(STAction):
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
"new_decomp_schedule_v1.txt")
self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur)
- output = await self._run_v1(prompt)
+ output = self._run_v1(prompt)
return output
diff --git a/examples/st_game/actions/st_action.py b/examples/st_game/actions/st_action.py
index 9cbf3cee6..15a4e58b6 100644
--- a/examples/st_game/actions/st_action.py
+++ b/examples/st_game/actions/st_action.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# @Desc : StanfordTown Action
-from typing import Union
+from typing import Union, Optional
from abc import abstractmethod
import json
@@ -53,22 +53,25 @@ class STAction(Action):
prompt = prompt.split("###")[1]
return prompt.strip()
- async def _run_v1(self, prompt: str, retry: int = 3) -> str:
+ def _ask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
+ return self.llm.ask(prompt)
+
+ def _run_v1(self, prompt: str, retry: int = 3) -> str:
"""
same with `gpt_structure.safe_generate_response`
default post-preprocess operations of LLM response
"""
for idx in range(retry):
- llm_resp = await self._aask(prompt)
+ llm_resp = self._ask(prompt)
if self._func_validate(llm_resp, prompt):
return self._func_cleanup(llm_resp, prompt)
- return self.fail_default_resp # TODO fix
+ return self.fail_default_resp
- async def _run_v2(self,
- prompt: str,
- example_output: str,
- special_instruction: str,
- retry: int = 3):
+ def _run_v2(self,
+ prompt: str,
+ example_output: str,
+ special_instruction: str,
+ retry: int = 3):
""" same with `gpt_structure.ChatGPT_safe_generate_response` """
prompt = '"""\n' + prompt + '\n"""\n'
prompt += f"Output the response to the prompt above in json. {special_instruction}\n"
@@ -77,7 +80,8 @@ class STAction(Action):
for idx in range(retry):
try:
- llm_resp = await self._aask(prompt)
+ llm_resp = self._ask(prompt)
+ print("llm_resp ", llm_resp)
end_idx = llm_resp.strip().rfind("}") + 1
llm_resp = llm_resp[:end_idx]
llm_resp = json.loads(llm_resp)["output"]
@@ -88,6 +92,6 @@ class STAction(Action):
pass
return False
- async def run(self, *args, **kwargs):
+ def run(self, *args, **kwargs):
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")
diff --git a/examples/st_game/actions/summarize_conv.py b/examples/st_game/actions/summarize_conv.py
index b396fd506..9c355e89b 100644
--- a/examples/st_game/actions/summarize_conv.py
+++ b/examples/st_game/actions/summarize_conv.py
@@ -28,7 +28,7 @@ class SummarizeConv(STAction):
def _func_fail_default_resp(self) -> str:
return "conversing with a housemate about morning greetings"
- async def run(self, conv: list):
+ def run(self, conv: list):
def create_prompt_input(conversation: list):
convo_str = ""
for row in conversation:
@@ -44,5 +44,5 @@ class SummarizeConv(STAction):
special_instruction = "The output must continue the sentence above by filling in the tag. " \
"Don't start with 'this is a conversation about...' Just finish the sentence " \
"but do not miss any important details (including who are chatting)."
- output = await self._run_v2(prompt, example_output, special_instruction)
+ output = self._run_v2(prompt, example_output, special_instruction)
return output
diff --git a/examples/st_game/maze_environment.py b/examples/st_game/maze_environment.py
index 9d393d765..94f39cbff 100644
--- a/examples/st_game/maze_environment.py
+++ b/examples/st_game/maze_environment.py
@@ -2,13 +2,18 @@
# -*- coding: utf-8 -*-
# @Desc : maze environment
+from pydantic import Field
+
from metagpt.environment import Environment
+from metagpt.roles.role import Role
+
from .maze import Maze
class MazeEnvironment(Environment):
- def __init__(self, name: str, maze: Maze) -> None:
- self.name = name
- self.maze = maze
-
\ No newline at end of file
+ maze: Maze = Field(default=Maze)
+
+ def add_role(self, role: Role):
+ role.set_env(self)
+ self.roles[role.name] = role # use role.name as key not role.profile
diff --git a/examples/st_game/roles/st_role.py b/examples/st_game/roles/st_role.py
index 643853063..9d6b8741f 100644
--- a/examples/st_game/roles/st_role.py
+++ b/examples/st_game/roles/st_role.py
@@ -11,12 +11,17 @@ Do the steps following:
- execute, move or else in the Maze
"""
import math
+import time
+
from pydantic import Field
from pathlib import Path
+import random
+import datetime
from operator import itemgetter
from metagpt.roles.role import Role, RoleContext
from metagpt.schema import Message
+from metagpt.logs import logger
from ..memory.agent_memory import AgentMemory, BasicMemory
from ..memory.spatial_memory import MemoryTree
@@ -25,13 +30,14 @@ from ..actions.user_requirement import UserRequirement
from ..maze_environment import MazeEnvironment
from ..memory.retrieve import agent_retrieve
from ..memory.scratch import Scratch
-from ..utils.utils import get_embedding, generate_poig_score
-
+from ..utils.utils import get_embedding, generate_poig_score, path_finder
+from ..utils.const import collision_block_id
from ..reflect.st_reflect import agent_reflect
+from ..utils.mg_ga_transform import save_movement, get_role_environment
class STRoleContext(RoleContext):
- env: 'MazeEnvironment' = Field(default=None)
+ env: 'MazeEnvironment' = Field(default=MazeEnvironment)
memory: AgentMemory = Field(default=AgentMemory)
scratch: Scratch = Field(default=Scratch)
spatial_memory: MemoryTree = Field(default=MemoryTree)
@@ -44,9 +50,21 @@ class STRole(Role):
def __init__(self,
name: str = "Klaus Mueller",
profile: str = "STMember",
- sim_path: str = "new_sim",
+ sim_code: str = "new_sim",
+ step: int = 0,
+ start_date: str = "",
+ curr_time: str = "",
+ sec_per_step: int = 10,
has_inner_voice: bool = False):
- self.sim_path = sim_path
+ self.sim_code = sim_code
+ self.step = step
+ self.start_time = datetime.datetime.strptime(f"{start_date}, 00:00:00", "%B %d, %Y, %H:%M:%S")
+ self.curr_time = datetime.datetime.strptime(curr_time, "%B %d, %Y, %H:%M:%S")
+ self.sec_per_step = sec_per_step
+
+ self.role_tile = (0, 0)
+ self.game_obj_cleanup = dict()
+
self._rc = STRoleContext()
super(STRole, self).__init__(name=name,
profile=profile)
@@ -59,10 +77,21 @@ class STRole(Role):
else:
self._watch([DummyAction])
+ # init role & maze
+ role_env = get_role_environment(self.sim_code, self.name, self.step)
+ pt_x = role_env["x"]
+ pt_y = role_env["y"]
+ self.role_tile = (pt_x, pt_y)
+ self._rc.env.maze.tiles[pt_y][pt_x]["events"].add(self.scratch.get_curr_event_and_desc())
+
@property
def name(self):
return self._setting.name
+ @property
+ def scratch(self):
+ return self._rc.scratch
+
def load_from(self, folder: Path):
"""
load role data from `storage/{simulation_name}/personas/{role_name}
@@ -238,15 +267,197 @@ class STRole(Role):
# TODO re-add result to memory
pass
+ def execute(self, plan: str):
+ """
+ Args:
+ plan: This is a string address of the action we need to execute.
+ It comes in the form of "{world}:{sector}:{arena}:{game_objects}".
+ It is important that you access this without doing negative
+ indexing (e.g., [-1]) because the latter address elements may not be
+ present in some cases.
+ e.g., "dolores double studio:double studio:bedroom 1:bed"
+ """
+ roles = self._rc.env.get_roles()
+ maze = self._rc.env.maze
+ if "" in plan and self._rc.scratch.planned_path == []:
+ self._rc.scratch.act_path_set = False
+
+ # is set to True if the path is set for the current action.
+ # It is False otherwise, and means we need to construct a new path.
+ if not self._rc.scratch.act_path_set:
+ # is a list of tile coordinates where the persona may go
+ # to execute the current action. The goal is to pick one of them.
+ target_tiles = None
+ logger.info("plan: ", plan)
+
+ if "" in plan:
+ # Executing persona-persona interaction.
+ target_p_tile = (roles[plan.split("")[-1].strip()]
+ .scratch.curr_tile)
+ potential_path = path_finder(maze.collision_maze,
+ self._rc.scratch.curr_tile,
+ target_p_tile,
+ collision_block_id)
+ if len(potential_path) <= 2:
+ target_tiles = [potential_path[0]]
+ else:
+ potential_1 = path_finder(maze.collision_maze,
+ self._rc.scratch.curr_tile,
+ potential_path[int(len(potential_path) / 2)],
+ collision_block_id)
+ potential_2 = path_finder(maze.collision_maze,
+ self._rc.scratch.curr_tile,
+ potential_path[int(len(potential_path) / 2) + 1],
+ collision_block_id)
+ if len(potential_1) <= len(potential_2):
+ target_tiles = [potential_path[int(len(potential_path) / 2)]]
+ else:
+ target_tiles = [potential_path[int(len(potential_path) / 2 + 1)]]
+
+ elif "" in plan:
+ # Executing interaction where the persona has decided to wait before
+ # executing their action.
+ x = int(plan.split()[1])
+ y = int(plan.split()[2])
+ target_tiles = [[x, y]]
+
+ elif "" in plan:
+ # Executing a random location action.
+ plan = ":".join(plan.split(":")[:-1])
+ target_tiles = maze.address_tiles[plan]
+ target_tiles = random.sample(list(target_tiles), 1)
+
+ else:
+ # This is our default execution. We simply take the persona to the
+ # location where the current action is taking place.
+ # Retrieve the target addresses. Again, plan is an action address in its
+ # string form. takes this and returns candidate
+ # coordinates.
+ if plan not in maze.address_tiles:
+ maze.address_tiles["Johnson Park:park:park garden"] # ERRORRRRRRR
+ else:
+ target_tiles = maze.address_tiles[plan]
+
+ # There are sometimes more than one tile returned from this (e.g., a tabe
+ # may stretch many coordinates). So, we sample a few here. And from that
+ # random sample, we will take the closest ones.
+ if len(target_tiles) < 4:
+ target_tiles = random.sample(list(target_tiles), len(target_tiles))
+ else:
+ target_tiles = random.sample(list(target_tiles), 4)
+ # If possible, we want personas to occupy different tiles when they are
+ # headed to the same location on the maze. It is ok if they end up on the
+ # same time, but we try to lower that probability.
+ # We take care of that overlap here.
+ persona_name_set = set(roles.keys())
+ new_target_tiles = []
+ for i in target_tiles:
+ curr_event_set = maze.access_tile(i)["events"]
+ pass_curr_tile = False
+ for j in curr_event_set:
+ if j[0] in persona_name_set:
+ pass_curr_tile = True
+ if not pass_curr_tile:
+ new_target_tiles += [i]
+ if len(new_target_tiles) == 0:
+ new_target_tiles = target_tiles
+ target_tiles = new_target_tiles
+
+ # Now that we've identified the target tile, we find the shortest path to
+ # one of the target tiles.
+ curr_tile = self._rc.scratch.curr_tile
+ collision_maze = maze.collision_maze
+ closest_target_tile = None
+ path = None
+ for i in target_tiles:
+ # path_finder takes a collision_mze and the curr_tile coordinate as
+ # an input, and returns a list of coordinate tuples that becomes the
+ # path.
+ # e.g., [(0, 1), (1, 1), (1, 2), (1, 3), (1, 4)...]
+ curr_path = path_finder(maze.collision_maze,
+ curr_tile,
+ i,
+ collision_block_id)
+ if not closest_target_tile:
+ closest_target_tile = i
+ path = curr_path
+ elif len(curr_path) < len(path):
+ closest_target_tile = i
+ path = curr_path
+
+ # Actually setting the and . We cut the
+ # first element in the planned_path because it includes the curr_tile.
+ self._rc.scratch.planned_path = path[1:]
+ self._rc.scratch.act_path_set = True
+
+ # Setting up the next immediate step. We stay at our curr_tile if there is
+ # no left, but otherwise, we go to the next tile in the path.
+ ret = self._rc.scratch.curr_tile
+ if self._rc.scratch.planned_path:
+ ret = self._rc.scratch.planned_path[0]
+ self._rc.scratch.planned_path = self._rc.scratch.planned_path[1:]
+
+ description = f"{self._rc.scratch.act_description}"
+ description += f" @ {self._rc.scratch.act_address}"
+
+ execution = ret, self._rc.scratch.act_pronunciatio, description
+ return execution
+
+ def update_role_env(self) -> bool:
+ role_env = get_role_environment(self.sim_code, self.name, self.step)
+ ret = True
+ if role_env:
+ for key, val in self.game_obj_cleanup.items():
+ self._rc.env.maze.turn_event_from_tile_idle(key, val)
+
+ # reset game_obj_cleanup
+ self.game_obj_cleanup = dict()
+ curr_tile = self.role_tile
+ new_tile = (role_env["x"], role_env["y"])
+ self._rc.env.maze.remove_subject_events_from_tile(self.name, curr_tile)
+ self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
+
+ # the persona will travel to get to their destination. *Once*
+ # the persona gets there, we activate the object action.
+ if not self.scratch.planned_path:
+ self.game_obj_cleanup[self.scratch.get_curr_event_and_desc()] = new_tile
+ self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
+ blank = (self.scratch.get_curr_obj_event_and_desc()[0], None, None, None)
+ self._rc.env.maze.remove_event_from_tile(blank, new_tile)
+ else:
+ ret = False
+ time.sleep(1)
+ logger.warning(f"{self.sim_code}/environment/{self.step}.json not exist or parses failed,"
+ f"sleep 1s and re-check")
+ return ret
+
async def _react(self) -> Message:
- maze_env = self._rc.env
+ # update role env
+ ret = self.update_role_env()
+ if not ret:
+ # TODO add message
+ return
+
# TODO observe
# get maze_env from self._rc.env, and observe env info
# TODO retrieve, use self._rc.memory 's retrieve functions
# TODO plan
+ plan = self.plan()
# TODO reflect
# TODO execute(feed-back into maze_env)
+ next_tile, pronunciatio, description = self.execute(plan)
+ role_move = {
+ "movement": next_tile,
+ "pronunciatio": pronunciatio,
+ "description": description,
+ "chat": self.scratch.chat
+ }
+ save_movement(self.name, role_move, step=self.step, sim_code=self.sim_code, curr_time=self.curr_time)
+
+ # step update
+ self.step += 1
+ self.curr_time += datetime.timedelta(seconds=self.sec_per_step)
diff --git a/examples/st_game/run_st_game.py b/examples/st_game/run_st_game.py
index 7b00a6f71..ca73c8fff 100644
--- a/examples/st_game/run_st_game.py
+++ b/examples/st_game/run_st_game.py
@@ -21,12 +21,17 @@ async def startup(idea: str,
reverie_meta = get_reverie_meta(fork_sim_code)
roles = []
sim_path = STORAGE_PATH.joinpath(sim_code)
+ sim_path.mkdir(exist_ok=True)
for idx, role_name in enumerate(reverie_meta["persona_names"]):
role_stg_path = STORAGE_PATH.joinpath(fork_sim_code).joinpath(f"personas/{role_name}")
has_inner_voice = True if idx == 0 else False
role = STRole(name=role_name,
- sim_path=sim_path,
+ sim_code=sim_code,
profile=f"STMember_{idx}",
+ step=reverie_meta.get("step", 0),
+ start_date=reverie_meta.get("start_date"),
+ curr_time=reverie_meta.get("curr_time"),
+ sec_per_step=reverie_meta.get("sec_per_step"),
has_inner_voice=has_inner_voice)
role.load_from(role_stg_path)
roles.append(role)
@@ -35,7 +40,7 @@ async def startup(idea: str,
town.wakeup_roles(roles)
town.invest(investment)
- town.start_project()
+ town.start_project(idea)
await town.run(n_round)
diff --git a/examples/st_game/tests/actions/test_summarize_conv.py b/examples/st_game/tests/actions/test_summarize_conv.py
new file mode 100644
index 000000000..d9efd4aa5
--- /dev/null
+++ b/examples/st_game/tests/actions/test_summarize_conv.py
@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# @Desc : unittest of actions/summarize_conv
+
+from metagpt.logs import logger
+
+from st_game.actions.summarize_conv import SummarizeConv
+
+
+def test_summarize_conv():
+ conv = [
+ ("Role_A", "what's the weather today?"),
+ ("Role_B", "It looks pretty good, and I will take a walk then.")
+ ]
+
+ output = SummarizeConv().run(conv)
+ assert "weather" in output
diff --git a/examples/st_game/utils/check.py b/examples/st_game/utils/check.py
index 0a806fe2d..3b9650937 100644
--- a/examples/st_game/utils/check.py
+++ b/examples/st_game/utils/check.py
@@ -1,14 +1,15 @@
-def check_if_file_exists(curr_file):
- """
- Checks if a file exists
- ARGS:
- curr_file: path to the current csv file.
- RETURNS:
- True if the file exists
- False if the file does not exist
- """
- try:
- with open(curr_file) as f_analysis_file: pass
- return True
- except:
- return False
\ No newline at end of file
+def check_if_file_exists(curr_file):
+ """
+ Checks if a file exists
+ ARGS:
+ curr_file: path to the current csv file.
+ RETURNS:
+ True if the file exists
+ False if the file does not exist
+ """
+ try:
+ with open(curr_file) as f_analysis_file:
+ pass
+ return True
+ except Exception as exp:
+ return False
diff --git a/examples/st_game/utils/const.py b/examples/st_game/utils/const.py
index e4dc38e55..3dcc4e172 100644
--- a/examples/st_game/utils/const.py
+++ b/examples/st_game/utils/const.py
@@ -8,3 +8,5 @@ ROOT_PATH = Path(__file__).parent.parent
STORAGE_PATH = ROOT_PATH.joinpath("storage")
MAZE_ASSET_PATH = ROOT_PATH.joinpath("static_dirs/assets/the_ville")
PROMPTS_DIR = ROOT_PATH.joinpath("prompts")
+
+collision_block_id = "32125"
diff --git a/examples/st_game/utils/mg_ga_transform.py b/examples/st_game/utils/mg_ga_transform.py
index f2d178b58..dd7d8840a 100644
--- a/examples/st_game/utils/mg_ga_transform.py
+++ b/examples/st_game/utils/mg_ga_transform.py
@@ -2,6 +2,10 @@
# -*- coding: utf-8 -*-
# @Desc : data transform of mg <-> ga under storage
+import json
+
+from metagpt.logs import logger
+
from .const import STORAGE_PATH
from .utils import read_json_file, write_json_file
@@ -10,3 +14,32 @@ def get_reverie_meta(sim_code: str) -> dict:
meta_file_path = STORAGE_PATH.joinpath(sim_code).joinpath("reverie/meta.json")
reverie_meta = read_json_file(meta_file_path)
return reverie_meta
+
+
+def save_movement(role_name: str, role_move: dict, step: int, sim_code: str, curr_time: str):
+ movement_path = STORAGE_PATH.joinpath(f"{sim_code}/movement/{step}.json")
+ if not movement_path.parent.exists():
+ movement_path.parent.mkdir(exist_ok=True)
+ if movement_path.exists():
+ with open(movement_path, "r") as fin:
+ movement = json.load(fin)
+ else:
+ movement = {
+ "persona": dict(),
+ "meta": dict()
+ }
+ movement["persona"][role_name] = role_move
+ movement["meta"]["curr_time"] = curr_time.strftime("%B %d, %Y, %H:%M:%S")
+
+ write_json_file(movement_path, movement)
+ logger.info(f"save_movement at step: {step}, curr_time: {movement['meta']['curr_time']}")
+
+
+def get_role_environment(sim_code: str, role_name: str, step: int = 0) -> dict:
+ env_path = STORAGE_PATH.joinpath(f"{sim_code}/environment/{step}.json")
+ role_env = None
+ if env_path.exists():
+ environment = read_json_file(env_path)
+ role_env = environment.get(role_name, None)
+
+ return role_env
diff --git a/examples/st_game/utils/utils.py b/examples/st_game/utils/utils.py
index 799d9d348..ed068a1bc 100644
--- a/examples/st_game/utils/utils.py
+++ b/examples/st_game/utils/utils.py
@@ -29,12 +29,12 @@ def write_json_file(json_file: str, data: list, encoding=None):
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
"""
- Reads in a csv file to a list of list. If header is True, it returns a
+ Reads in a csv file to a list of list. If header is True, it returns a
tuple with (header row, all rows)
ARGS:
- curr_file: path to the current csv file.
- RETURNS:
- List of list where the component lists are the rows of the file.
+ curr_file: path to the current csv file.
+ RETURNS:
+ List of list where the component lists are the rows of the file.
"""
if not header:
analysis_list = []
@@ -92,3 +92,87 @@ def extract_first_json_dict(data_str: str) -> Union[None, dict]:
except json.JSONDecodeError:
# If parsing fails, return None
return None
+
+
+def path_finder_v2(a, start, end, collision_block_char) -> list[int]:
+ def make_step(m, k):
+ for i in range(len(m)):
+ for j in range(len(m[i])):
+ if m[i][j] == k:
+ if i > 0 and m[i - 1][j] == 0 and a[i - 1][j] == 0:
+ m[i - 1][j] = k + 1
+ if j > 0 and m[i][j - 1] == 0 and a[i][j - 1] == 0:
+ m[i][j - 1] = k + 1
+ if i < len(m) - 1 and m[i + 1][j] == 0 and a[i + 1][j] == 0:
+ m[i + 1][j] = k + 1
+ if j < len(m[i]) - 1 and m[i][j + 1] == 0 and a[i][j + 1] == 0:
+ m[i][j + 1] = k + 1
+
+ new_maze = []
+ for row in a:
+ new_row = []
+ for j in row:
+ if j == collision_block_char:
+ new_row += [1]
+ else:
+ new_row += [0]
+ new_maze += [new_row]
+ a = new_maze
+
+ m = []
+ for i in range(len(a)):
+ m.append([])
+ for j in range(len(a[i])):
+ m[-1].append(0)
+ i, j = start
+ m[i][j] = 1
+
+ k = 0
+ except_handle = 150
+ while m[end[0]][end[1]] == 0:
+ k += 1
+ make_step(m, k)
+
+ if except_handle == 0:
+ break
+ except_handle -= 1
+
+ i, j = end
+ k = m[i][j]
+ the_path = [(i, j)]
+ while k > 1:
+ if i > 0 and m[i - 1][j] == k - 1:
+ i, j = i - 1, j
+ the_path.append((i, j))
+ k -= 1
+ elif j > 0 and m[i][j - 1] == k - 1:
+ i, j = i, j - 1
+ the_path.append((i, j))
+ k -= 1
+ elif i < len(m) - 1 and m[i + 1][j] == k - 1:
+ i, j = i + 1, j
+ the_path.append((i, j))
+ k -= 1
+ elif j < len(m[i]) - 1 and m[i][j + 1] == k - 1:
+ i, j = i, j + 1
+ the_path.append((i, j))
+ k -= 1
+
+ the_path.reverse()
+ return the_path
+
+
+def path_finder(maze: "Maze", start: list[int], end: list[int], collision_block_char: str) -> list[int]:
+ # EMERGENCY PATCH
+ start = (start[1], start[0])
+ end = (end[1], end[0])
+ # END EMERGENCY PATCH
+
+ path = path_finder_v2(maze, start, end, collision_block_char)
+
+ new_path = []
+ for i in path:
+ new_path += [(i[1], i[0])]
+ path = new_path
+
+ return path