mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-17 15:35:21 +02:00
Merge branch 'ga_game' of https://github.com/fucking-dog/MetaGPT into ga_game
This commit is contained in:
commit
cf23d57714
13 changed files with 321 additions and 88 deletions
|
|
@ -29,7 +29,7 @@ class AgentChatSumRel(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
pass
|
||||
|
||||
async def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
|
||||
def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
|
||||
def create_prompt_input(init_role: STRole, target_role: STRole, statements: str) -> str:
|
||||
prompt_input = [statements, init_role.name, target_role.name]
|
||||
return prompt_input
|
||||
|
|
@ -40,7 +40,7 @@ class AgentChatSumRel(STAction):
|
|||
|
||||
example_output = "Jane Doe is working on a project"
|
||||
special_instruction = "The output should be a string that responds to the question."
|
||||
output = await self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
output = self._run_v2(prompt,
|
||||
example_output,
|
||||
special_instruction)
|
||||
return output[0]
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class DecideToTalk(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
return "yes"
|
||||
|
||||
async def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
|
||||
def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
|
||||
"""Run action"""
|
||||
def create_prompt_input(init_role: STRole, target_role: STRole, retrieved: dict) -> str:
|
||||
scratch = init_role._rc.scratch
|
||||
|
|
@ -94,7 +94,7 @@ class DecideToTalk(STAction):
|
|||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input=prompt_input,
|
||||
tmpl_filename="decide_to_talk_v2.txt")
|
||||
self.fail_default_resp = self._func_fail_default_resp()
|
||||
output = await self._run_v1(prompt) # yes or no
|
||||
output = self._run_v1(prompt) # yes or no
|
||||
result = True if output == "yes" else False
|
||||
logger.info(f"Run action: {self.__class__.__name__} with result: {result}")
|
||||
return result
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ class GenIterChatUTT(STAction):
|
|||
cleaned_dict["end"] = False
|
||||
return cleaned_dict
|
||||
|
||||
async def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
|
||||
curr_chat: list[str], *args, **kwargs) -> dict:
|
||||
def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
|
||||
curr_chat: list[str], *args, **kwargs) -> dict:
|
||||
def create_prompt_input(maze: Maze, init_role: STRole, target_role: STRole,
|
||||
retrieved: dict, curr_context: str, curr_chat: list[str]):
|
||||
role = init_role
|
||||
|
|
@ -97,5 +97,5 @@ class GenIterChatUTT(STAction):
|
|||
"iterative_convo_v1.txt")
|
||||
# original using `ChatGPT_safe_generate_response_OLD`
|
||||
self.fail_default_resp = self._func_fail_default_resp()
|
||||
output = await self._run_v1(prompt)
|
||||
output = self._run_v1(prompt)
|
||||
return output
|
||||
|
|
|
|||
|
|
@ -85,15 +85,15 @@ class NewDecompSchedule(STAction):
|
|||
|
||||
return ret
|
||||
|
||||
async def run(self,
|
||||
role: STRole,
|
||||
main_act_dur: int,
|
||||
truncated_act_dur: int,
|
||||
start_time_hour: datetime,
|
||||
end_time_hour: datetime,
|
||||
inserted_act: str,
|
||||
inserted_act_dur: int,
|
||||
*args, **kwargs):
|
||||
def run(self,
|
||||
role: STRole,
|
||||
main_act_dur: int,
|
||||
truncated_act_dur: int,
|
||||
start_time_hour: datetime,
|
||||
end_time_hour: datetime,
|
||||
inserted_act: str,
|
||||
inserted_act_dur: int,
|
||||
*args, **kwargs):
|
||||
|
||||
def create_prompt_input(role: STRole,
|
||||
main_act_dur: int,
|
||||
|
|
@ -149,5 +149,5 @@ class NewDecompSchedule(STAction):
|
|||
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
|
||||
"new_decomp_schedule_v1.txt")
|
||||
self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur)
|
||||
output = await self._run_v1(prompt)
|
||||
output = self._run_v1(prompt)
|
||||
return output
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Desc : StanfordTown Action
|
||||
|
||||
from typing import Union
|
||||
from typing import Union, Optional
|
||||
from abc import abstractmethod
|
||||
import json
|
||||
|
||||
|
|
@ -53,22 +53,25 @@ class STAction(Action):
|
|||
prompt = prompt.split("<commentblockmarker>###</commentblockmarker>")[1]
|
||||
return prompt.strip()
|
||||
|
||||
async def _run_v1(self, prompt: str, retry: int = 3) -> str:
|
||||
def _ask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
|
||||
return self.llm.ask(prompt)
|
||||
|
||||
def _run_v1(self, prompt: str, retry: int = 3) -> str:
|
||||
"""
|
||||
same with `gpt_structure.safe_generate_response`
|
||||
default post-preprocess operations of LLM response
|
||||
"""
|
||||
for idx in range(retry):
|
||||
llm_resp = await self._aask(prompt)
|
||||
llm_resp = self._ask(prompt)
|
||||
if self._func_validate(llm_resp, prompt):
|
||||
return self._func_cleanup(llm_resp, prompt)
|
||||
return self.fail_default_resp # TODO fix
|
||||
return self.fail_default_resp
|
||||
|
||||
async def _run_v2(self,
|
||||
prompt: str,
|
||||
example_output: str,
|
||||
special_instruction: str,
|
||||
retry: int = 3):
|
||||
def _run_v2(self,
|
||||
prompt: str,
|
||||
example_output: str,
|
||||
special_instruction: str,
|
||||
retry: int = 3):
|
||||
""" same with `gpt_structure.ChatGPT_safe_generate_response` """
|
||||
prompt = '"""\n' + prompt + '\n"""\n'
|
||||
prompt += f"Output the response to the prompt above in json. {special_instruction}\n"
|
||||
|
|
@ -77,7 +80,8 @@ class STAction(Action):
|
|||
|
||||
for idx in range(retry):
|
||||
try:
|
||||
llm_resp = await self._aask(prompt)
|
||||
llm_resp = self._ask(prompt)
|
||||
print("llm_resp ", llm_resp)
|
||||
end_idx = llm_resp.strip().rfind("}") + 1
|
||||
llm_resp = llm_resp[:end_idx]
|
||||
llm_resp = json.loads(llm_resp)["output"]
|
||||
|
|
@ -88,6 +92,6 @@ class STAction(Action):
|
|||
pass
|
||||
return False
|
||||
|
||||
async def run(self, *args, **kwargs):
|
||||
def run(self, *args, **kwargs):
|
||||
"""Run action"""
|
||||
raise NotImplementedError("The run method should be implemented in a subclass.")
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class SummarizeConv(STAction):
|
|||
def _func_fail_default_resp(self) -> str:
|
||||
return "conversing with a housemate about morning greetings"
|
||||
|
||||
async def run(self, conv: list):
|
||||
def run(self, conv: list):
|
||||
def create_prompt_input(conversation: list):
|
||||
convo_str = ""
|
||||
for row in conversation:
|
||||
|
|
@ -44,5 +44,5 @@ class SummarizeConv(STAction):
|
|||
special_instruction = "The output must continue the sentence above by filling in the <fill in> tag. " \
|
||||
"Don't start with 'this is a conversation about...' Just finish the sentence " \
|
||||
"but do not miss any important details (including who are chatting)."
|
||||
output = await self._run_v2(prompt, example_output, special_instruction)
|
||||
output = self._run_v2(prompt, example_output, special_instruction)
|
||||
return output
|
||||
|
|
|
|||
|
|
@ -2,13 +2,18 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Desc : maze environment
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from metagpt.environment import Environment
|
||||
from metagpt.roles.role import Role
|
||||
|
||||
from .maze import Maze
|
||||
|
||||
|
||||
class MazeEnvironment(Environment):
|
||||
def __init__(self, name: str, maze: Maze) -> None:
|
||||
self.name = name
|
||||
self.maze = maze
|
||||
|
||||
|
||||
maze: Maze = Field(default=Maze)
|
||||
|
||||
def add_role(self, role: Role):
|
||||
role.set_env(self)
|
||||
self.roles[role.name] = role # use role.name as key not role.profile
|
||||
|
|
|
|||
|
|
@ -11,12 +11,17 @@ Do the steps following:
|
|||
- execute, move or else in the Maze
|
||||
"""
|
||||
import math
|
||||
import time
|
||||
|
||||
from pydantic import Field
|
||||
from pathlib import Path
|
||||
import random
|
||||
import datetime
|
||||
from operator import itemgetter
|
||||
|
||||
from metagpt.roles.role import Role, RoleContext
|
||||
from metagpt.schema import Message
|
||||
from metagpt.logs import logger
|
||||
|
||||
from ..memory.agent_memory import AgentMemory, BasicMemory
|
||||
from ..memory.spatial_memory import MemoryTree
|
||||
|
|
@ -25,13 +30,14 @@ from ..actions.user_requirement import UserRequirement
|
|||
from ..maze_environment import MazeEnvironment
|
||||
from ..memory.retrieve import new_agent_retrieve
|
||||
from ..memory.scratch import Scratch
|
||||
from ..utils.utils import get_embedding, generate_poig_score
|
||||
|
||||
from ..utils.utils import get_embedding, generate_poig_score, path_finder
|
||||
from ..utils.const import collision_block_id
|
||||
from ..reflect.st_reflect import agent_reflect
|
||||
from ..utils.mg_ga_transform import save_movement, get_role_environment
|
||||
|
||||
|
||||
class STRoleContext(RoleContext):
|
||||
env: 'MazeEnvironment' = Field(default=None)
|
||||
env: 'MazeEnvironment' = Field(default=MazeEnvironment)
|
||||
memory: AgentMemory = Field(default=AgentMemory)
|
||||
scratch: Scratch = Field(default=Scratch)
|
||||
spatial_memory: MemoryTree = Field(default=MemoryTree)
|
||||
|
|
@ -44,9 +50,21 @@ class STRole(Role):
|
|||
def __init__(self,
|
||||
name: str = "Klaus Mueller",
|
||||
profile: str = "STMember",
|
||||
sim_path: str = "new_sim",
|
||||
sim_code: str = "new_sim",
|
||||
step: int = 0,
|
||||
start_date: str = "",
|
||||
curr_time: str = "",
|
||||
sec_per_step: int = 10,
|
||||
has_inner_voice: bool = False):
|
||||
self.sim_path = sim_path
|
||||
self.sim_code = sim_code
|
||||
self.step = step
|
||||
self.start_time = datetime.datetime.strptime(f"{start_date}, 00:00:00", "%B %d, %Y, %H:%M:%S")
|
||||
self.curr_time = datetime.datetime.strptime(curr_time, "%B %d, %Y, %H:%M:%S")
|
||||
self.sec_per_step = sec_per_step
|
||||
|
||||
self.role_tile = (0, 0)
|
||||
self.game_obj_cleanup = dict()
|
||||
|
||||
self._rc = STRoleContext()
|
||||
super(STRole, self).__init__(name=name,
|
||||
profile=profile)
|
||||
|
|
@ -59,10 +77,21 @@ class STRole(Role):
|
|||
else:
|
||||
self._watch([DummyAction])
|
||||
|
||||
# init role & maze
|
||||
role_env = get_role_environment(self.sim_code, self.name, self.step)
|
||||
pt_x = role_env["x"]
|
||||
pt_y = role_env["y"]
|
||||
self.role_tile = (pt_x, pt_y)
|
||||
self._rc.env.maze.tiles[pt_y][pt_x]["events"].add(self.scratch.get_curr_event_and_desc())
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._setting.name
|
||||
|
||||
@property
|
||||
def scratch(self):
|
||||
return self._rc.scratch
|
||||
|
||||
def load_from(self, folder: Path):
|
||||
"""
|
||||
load role data from `storage/{simulation_name}/personas/{role_name}
|
||||
|
|
@ -237,15 +266,197 @@ class STRole(Role):
|
|||
# TODO re-add result to memory
|
||||
pass
|
||||
|
||||
def execute(self, plan: str):
|
||||
"""
|
||||
Args:
|
||||
plan: This is a string address of the action we need to execute.
|
||||
It comes in the form of "{world}:{sector}:{arena}:{game_objects}".
|
||||
It is important that you access this without doing negative
|
||||
indexing (e.g., [-1]) because the latter address elements may not be
|
||||
present in some cases.
|
||||
e.g., "dolores double studio:double studio:bedroom 1:bed"
|
||||
"""
|
||||
roles = self._rc.env.get_roles()
|
||||
maze = self._rc.env.maze
|
||||
if "<random>" in plan and self._rc.scratch.planned_path == []:
|
||||
self._rc.scratch.act_path_set = False
|
||||
|
||||
# <act_path_set> is set to True if the path is set for the current action.
|
||||
# It is False otherwise, and means we need to construct a new path.
|
||||
if not self._rc.scratch.act_path_set:
|
||||
# <target_tiles> is a list of tile coordinates where the persona may go
|
||||
# to execute the current action. The goal is to pick one of them.
|
||||
target_tiles = None
|
||||
logger.info("plan: ", plan)
|
||||
|
||||
if "<persona>" in plan:
|
||||
# Executing persona-persona interaction.
|
||||
target_p_tile = (roles[plan.split("<persona>")[-1].strip()]
|
||||
.scratch.curr_tile)
|
||||
potential_path = path_finder(maze.collision_maze,
|
||||
self._rc.scratch.curr_tile,
|
||||
target_p_tile,
|
||||
collision_block_id)
|
||||
if len(potential_path) <= 2:
|
||||
target_tiles = [potential_path[0]]
|
||||
else:
|
||||
potential_1 = path_finder(maze.collision_maze,
|
||||
self._rc.scratch.curr_tile,
|
||||
potential_path[int(len(potential_path) / 2)],
|
||||
collision_block_id)
|
||||
potential_2 = path_finder(maze.collision_maze,
|
||||
self._rc.scratch.curr_tile,
|
||||
potential_path[int(len(potential_path) / 2) + 1],
|
||||
collision_block_id)
|
||||
if len(potential_1) <= len(potential_2):
|
||||
target_tiles = [potential_path[int(len(potential_path) / 2)]]
|
||||
else:
|
||||
target_tiles = [potential_path[int(len(potential_path) / 2 + 1)]]
|
||||
|
||||
elif "<waiting>" in plan:
|
||||
# Executing interaction where the persona has decided to wait before
|
||||
# executing their action.
|
||||
x = int(plan.split()[1])
|
||||
y = int(plan.split()[2])
|
||||
target_tiles = [[x, y]]
|
||||
|
||||
elif "<random>" in plan:
|
||||
# Executing a random location action.
|
||||
plan = ":".join(plan.split(":")[:-1])
|
||||
target_tiles = maze.address_tiles[plan]
|
||||
target_tiles = random.sample(list(target_tiles), 1)
|
||||
|
||||
else:
|
||||
# This is our default execution. We simply take the persona to the
|
||||
# location where the current action is taking place.
|
||||
# Retrieve the target addresses. Again, plan is an action address in its
|
||||
# string form. <maze.address_tiles> takes this and returns candidate
|
||||
# coordinates.
|
||||
if plan not in maze.address_tiles:
|
||||
maze.address_tiles["Johnson Park:park:park garden"] # ERRORRRRRRR
|
||||
else:
|
||||
target_tiles = maze.address_tiles[plan]
|
||||
|
||||
# There are sometimes more than one tile returned from this (e.g., a tabe
|
||||
# may stretch many coordinates). So, we sample a few here. And from that
|
||||
# random sample, we will take the closest ones.
|
||||
if len(target_tiles) < 4:
|
||||
target_tiles = random.sample(list(target_tiles), len(target_tiles))
|
||||
else:
|
||||
target_tiles = random.sample(list(target_tiles), 4)
|
||||
# If possible, we want personas to occupy different tiles when they are
|
||||
# headed to the same location on the maze. It is ok if they end up on the
|
||||
# same time, but we try to lower that probability.
|
||||
# We take care of that overlap here.
|
||||
persona_name_set = set(roles.keys())
|
||||
new_target_tiles = []
|
||||
for i in target_tiles:
|
||||
curr_event_set = maze.access_tile(i)["events"]
|
||||
pass_curr_tile = False
|
||||
for j in curr_event_set:
|
||||
if j[0] in persona_name_set:
|
||||
pass_curr_tile = True
|
||||
if not pass_curr_tile:
|
||||
new_target_tiles += [i]
|
||||
if len(new_target_tiles) == 0:
|
||||
new_target_tiles = target_tiles
|
||||
target_tiles = new_target_tiles
|
||||
|
||||
# Now that we've identified the target tile, we find the shortest path to
|
||||
# one of the target tiles.
|
||||
curr_tile = self._rc.scratch.curr_tile
|
||||
collision_maze = maze.collision_maze
|
||||
closest_target_tile = None
|
||||
path = None
|
||||
for i in target_tiles:
|
||||
# path_finder takes a collision_mze and the curr_tile coordinate as
|
||||
# an input, and returns a list of coordinate tuples that becomes the
|
||||
# path.
|
||||
# e.g., [(0, 1), (1, 1), (1, 2), (1, 3), (1, 4)...]
|
||||
curr_path = path_finder(maze.collision_maze,
|
||||
curr_tile,
|
||||
i,
|
||||
collision_block_id)
|
||||
if not closest_target_tile:
|
||||
closest_target_tile = i
|
||||
path = curr_path
|
||||
elif len(curr_path) < len(path):
|
||||
closest_target_tile = i
|
||||
path = curr_path
|
||||
|
||||
# Actually setting the <planned_path> and <act_path_set>. We cut the
|
||||
# first element in the planned_path because it includes the curr_tile.
|
||||
self._rc.scratch.planned_path = path[1:]
|
||||
self._rc.scratch.act_path_set = True
|
||||
|
||||
# Setting up the next immediate step. We stay at our curr_tile if there is
|
||||
# no <planned_path> left, but otherwise, we go to the next tile in the path.
|
||||
ret = self._rc.scratch.curr_tile
|
||||
if self._rc.scratch.planned_path:
|
||||
ret = self._rc.scratch.planned_path[0]
|
||||
self._rc.scratch.planned_path = self._rc.scratch.planned_path[1:]
|
||||
|
||||
description = f"{self._rc.scratch.act_description}"
|
||||
description += f" @ {self._rc.scratch.act_address}"
|
||||
|
||||
execution = ret, self._rc.scratch.act_pronunciatio, description
|
||||
return execution
|
||||
|
||||
def update_role_env(self) -> bool:
|
||||
role_env = get_role_environment(self.sim_code, self.name, self.step)
|
||||
ret = True
|
||||
if role_env:
|
||||
for key, val in self.game_obj_cleanup.items():
|
||||
self._rc.env.maze.turn_event_from_tile_idle(key, val)
|
||||
|
||||
# reset game_obj_cleanup
|
||||
self.game_obj_cleanup = dict()
|
||||
curr_tile = self.role_tile
|
||||
new_tile = (role_env["x"], role_env["y"])
|
||||
self._rc.env.maze.remove_subject_events_from_tile(self.name, curr_tile)
|
||||
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
|
||||
|
||||
# the persona will travel to get to their destination. *Once*
|
||||
# the persona gets there, we activate the object action.
|
||||
if not self.scratch.planned_path:
|
||||
self.game_obj_cleanup[self.scratch.get_curr_event_and_desc()] = new_tile
|
||||
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
|
||||
blank = (self.scratch.get_curr_obj_event_and_desc()[0], None, None, None)
|
||||
self._rc.env.maze.remove_event_from_tile(blank, new_tile)
|
||||
else:
|
||||
ret = False
|
||||
time.sleep(1)
|
||||
logger.warning(f"{self.sim_code}/environment/{self.step}.json not exist or parses failed,"
|
||||
f"sleep 1s and re-check")
|
||||
return ret
|
||||
|
||||
async def _react(self) -> Message:
|
||||
maze_env = self._rc.env
|
||||
# update role env
|
||||
ret = self.update_role_env()
|
||||
if not ret:
|
||||
# TODO add message
|
||||
return
|
||||
|
||||
# TODO observe
|
||||
# get maze_env from self._rc.env, and observe env info
|
||||
|
||||
# TODO retrieve, use self._rc.memory 's retrieve functions
|
||||
|
||||
# TODO plan
|
||||
plan = self.plan()
|
||||
|
||||
# TODO reflect
|
||||
|
||||
# TODO execute(feed-back into maze_env)
|
||||
next_tile, pronunciatio, description = self.execute(plan)
|
||||
role_move = {
|
||||
"movement": next_tile,
|
||||
"pronunciatio": pronunciatio,
|
||||
"description": description,
|
||||
"chat": self.scratch.chat
|
||||
}
|
||||
save_movement(self.name, role_move, step=self.step, sim_code=self.sim_code, curr_time=self.curr_time)
|
||||
|
||||
# step update
|
||||
self.step += 1
|
||||
self.curr_time += datetime.timedelta(seconds=self.sec_per_step)
|
||||
|
|
|
|||
|
|
@ -21,12 +21,17 @@ async def startup(idea: str,
|
|||
reverie_meta = get_reverie_meta(fork_sim_code)
|
||||
roles = []
|
||||
sim_path = STORAGE_PATH.joinpath(sim_code)
|
||||
sim_path.mkdir(exist_ok=True)
|
||||
for idx, role_name in enumerate(reverie_meta["persona_names"]):
|
||||
role_stg_path = STORAGE_PATH.joinpath(fork_sim_code).joinpath(f"personas/{role_name}")
|
||||
has_inner_voice = True if idx == 0 else False
|
||||
role = STRole(name=role_name,
|
||||
sim_path=sim_path,
|
||||
sim_code=sim_code,
|
||||
profile=f"STMember_{idx}",
|
||||
step=reverie_meta.get("step", 0),
|
||||
start_date=reverie_meta.get("start_date"),
|
||||
curr_time=reverie_meta.get("curr_time"),
|
||||
sec_per_step=reverie_meta.get("sec_per_step"),
|
||||
has_inner_voice=has_inner_voice)
|
||||
role.load_from(role_stg_path)
|
||||
roles.append(role)
|
||||
|
|
@ -35,7 +40,7 @@ async def startup(idea: str,
|
|||
town.wakeup_roles(roles)
|
||||
|
||||
town.invest(investment)
|
||||
town.start_project()
|
||||
town.start_project(idea)
|
||||
|
||||
await town.run(n_round)
|
||||
|
||||
|
|
|
|||
17
examples/st_game/tests/actions/test_summarize_conv.py
Normal file
17
examples/st_game/tests/actions/test_summarize_conv.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# @Desc : unittest of actions/summarize_conv
|
||||
|
||||
from metagpt.logs import logger
|
||||
|
||||
from st_game.actions.summarize_conv import SummarizeConv
|
||||
|
||||
|
||||
def test_summarize_conv():
|
||||
conv = [
|
||||
("Role_A", "what's the weather today?"),
|
||||
("Role_B", "It looks pretty good, and I will take a walk then.")
|
||||
]
|
||||
|
||||
output = SummarizeConv().run(conv)
|
||||
assert "weather" in output
|
||||
|
|
@ -8,3 +8,5 @@ ROOT_PATH = Path(__file__).parent.parent
|
|||
STORAGE_PATH = ROOT_PATH.joinpath("storage")
|
||||
MAZE_ASSET_PATH = ROOT_PATH.joinpath("static_dirs/assets/the_ville")
|
||||
PROMPTS_DIR = ROOT_PATH.joinpath("prompts")
|
||||
|
||||
collision_block_id = "32125"
|
||||
|
|
|
|||
|
|
@ -2,6 +2,10 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Desc : data transform of mg <-> ga under storage
|
||||
|
||||
import json
|
||||
|
||||
from metagpt.logs import logger
|
||||
|
||||
from .const import STORAGE_PATH
|
||||
from .utils import read_json_file, write_json_file
|
||||
|
||||
|
|
@ -10,3 +14,32 @@ def get_reverie_meta(sim_code: str) -> dict:
|
|||
meta_file_path = STORAGE_PATH.joinpath(sim_code).joinpath("reverie/meta.json")
|
||||
reverie_meta = read_json_file(meta_file_path)
|
||||
return reverie_meta
|
||||
|
||||
|
||||
def save_movement(role_name: str, role_move: dict, step: int, sim_code: str, curr_time: str):
|
||||
movement_path = STORAGE_PATH.joinpath(f"{sim_code}/movement/{step}.json")
|
||||
if not movement_path.parent.exists():
|
||||
movement_path.parent.mkdir(exist_ok=True)
|
||||
if movement_path.exists():
|
||||
with open(movement_path, "r") as fin:
|
||||
movement = json.load(fin)
|
||||
else:
|
||||
movement = {
|
||||
"persona": dict(),
|
||||
"meta": dict()
|
||||
}
|
||||
movement["persona"][role_name] = role_move
|
||||
movement["meta"]["curr_time"] = curr_time.strftime("%B %d, %Y, %H:%M:%S")
|
||||
|
||||
write_json_file(movement_path, movement)
|
||||
logger.info(f"save_movement at step: {step}, curr_time: {movement['meta']['curr_time']}")
|
||||
|
||||
|
||||
def get_role_environment(sim_code: str, role_name: str, step: int = 0) -> dict:
|
||||
env_path = STORAGE_PATH.joinpath(f"{sim_code}/environment/{step}.json")
|
||||
role_env = None
|
||||
if env_path.exists():
|
||||
environment = read_json_file(env_path)
|
||||
role_env = environment.get(role_name, None)
|
||||
|
||||
return role_env
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
# @Desc : utils
|
||||
|
||||
from typing import Any, Union
|
||||
import os
|
||||
import json
|
||||
import openai
|
||||
from pathlib import Path
|
||||
|
|
@ -177,46 +176,3 @@ def path_finder(maze: "Maze", start: list[int], end: list[int], collision_block_
|
|||
path = new_path
|
||||
|
||||
return path
|
||||
|
||||
def create_folder_if_not_there(curr_path):
|
||||
"""
|
||||
Checks if a folder in the curr_path exists. If it does not exist, creates
|
||||
the folder.
|
||||
Note that if the curr_path designates a file location, it will operate on
|
||||
the folder that contains the file. But the function also works even if the
|
||||
path designates to just a folder.
|
||||
Args:
|
||||
curr_list: list to write. The list comes in the following form:
|
||||
[['key1', 'val1-1', 'val1-2'...],
|
||||
['key2', 'val2-1', 'val2-2'...],]
|
||||
outfile: name of the csv file to write
|
||||
RETURNS:
|
||||
True: if a new folder is created
|
||||
False: if a new folder is not created
|
||||
"""
|
||||
outfolder_name = curr_path.split("/")
|
||||
if len(outfolder_name) != 1:
|
||||
# This checks if the curr path is a file or a folder.
|
||||
if "." in outfolder_name[-1]:
|
||||
outfolder_name = outfolder_name[:-1]
|
||||
|
||||
outfolder_name = "/".join(outfolder_name)
|
||||
if not os.path.exists(outfolder_name):
|
||||
os.makedirs(outfolder_name)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def find_filenames(path_to_dir, suffix=".csv"):
|
||||
"""
|
||||
Given a directory, find all files that end with the provided suffix and
|
||||
return their paths.
|
||||
ARGS:
|
||||
path_to_dir: Path to the current directory
|
||||
suffix: The target suffix.
|
||||
RETURNS:
|
||||
A list of paths to all files in the directory.
|
||||
"""
|
||||
filenames = os.listdir(path_to_dir)
|
||||
return [path_to_dir + "/" + filename
|
||||
for filename in filenames if filename.endswith(suffix)]
|
||||
Loading…
Add table
Add a link
Reference in a new issue