update role with pre-init role&env

This commit is contained in:
better629 2023-10-02 14:32:18 +08:00
parent 9aa6417673
commit 6d85b12e89
14 changed files with 424 additions and 62 deletions

View file

@ -29,7 +29,7 @@ class AgentChatSumRel(STAction):
def _func_fail_default_resp(self) -> str:
pass
async def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
def run(self, init_role: STRole, target_role: STRole, statements: str) -> str:
def create_prompt_input(init_role: STRole, target_role: STRole, statements: str) -> str:
prompt_input = [statements, init_role.name, target_role.name]
return prompt_input
@ -40,7 +40,7 @@ class AgentChatSumRel(STAction):
example_output = "Jane Doe is working on a project"
special_instruction = "The output should be a string that responds to the question."
output = await self._run_v2(prompt,
example_output,
special_instruction)
output = self._run_v2(prompt,
example_output,
special_instruction)
return output[0]

View file

@ -29,7 +29,7 @@ class DecideToTalk(STAction):
def _func_fail_default_resp(self) -> str:
return "yes"
async def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
def run(self, init_role: STRole, target_role: STRole, retrieved: dict, *args, **kwargs) -> bool:
"""Run action"""
def create_prompt_input(init_role: STRole, target_role: STRole, retrieved: dict) -> str:
scratch = init_role._rc.scratch
@ -94,7 +94,7 @@ class DecideToTalk(STAction):
prompt = self.generate_prompt_with_tmpl_filename(prompt_input=prompt_input,
tmpl_filename="decide_to_talk_v2.txt")
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_v1(prompt) # yes or no
output = self._run_v1(prompt) # yes or no
result = True if output == "yes" else False
logger.info(f"Run action: {self.__class__.__name__} with result: {result}")
return result

View file

@ -45,8 +45,8 @@ class GenIterChatUTT(STAction):
cleaned_dict["end"] = False
return cleaned_dict
async def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
curr_chat: list[str], *args, **kwargs) -> dict:
def run(self, maze: Maze, init_role: STRole, target_role: STRole, retrieved: dict, curr_context: str,
curr_chat: list[str], *args, **kwargs) -> dict:
def create_prompt_input(maze: Maze, init_role: STRole, target_role: STRole,
retrieved: dict, curr_context: str, curr_chat: list[str]):
role = init_role
@ -97,5 +97,5 @@ class GenIterChatUTT(STAction):
"iterative_convo_v1.txt")
# original using `ChatGPT_safe_generate_response_OLD`
self.fail_default_resp = self._func_fail_default_resp()
output = await self._run_v1(prompt)
output = self._run_v1(prompt)
return output

View file

@ -85,15 +85,15 @@ class NewDecompSchedule(STAction):
return ret
async def run(self,
role: STRole,
main_act_dur: int,
truncated_act_dur: int,
start_time_hour: datetime,
end_time_hour: datetime,
inserted_act: str,
inserted_act_dur: int,
*args, **kwargs):
def run(self,
role: STRole,
main_act_dur: int,
truncated_act_dur: int,
start_time_hour: datetime,
end_time_hour: datetime,
inserted_act: str,
inserted_act_dur: int,
*args, **kwargs):
def create_prompt_input(role: STRole,
main_act_dur: int,
@ -149,5 +149,5 @@ class NewDecompSchedule(STAction):
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
"new_decomp_schedule_v1.txt")
self.fail_default_resp = self._func_fail_default_resp(main_act_dur, truncated_act_dur)
output = await self._run_v1(prompt)
output = self._run_v1(prompt)
return output

View file

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
# @Desc : StanfordTown Action
from typing import Union
from typing import Union, Optional
from abc import abstractmethod
import json
@ -53,22 +53,25 @@ class STAction(Action):
prompt = prompt.split("<commentblockmarker>###</commentblockmarker>")[1]
return prompt.strip()
async def _run_v1(self, prompt: str, retry: int = 3) -> str:
def _ask(self, prompt: str, system_msgs: Optional[list[str]] = None) -> str:
return self.llm.ask(prompt)
def _run_v1(self, prompt: str, retry: int = 3) -> str:
"""
same with `gpt_structure.safe_generate_response`
default post-preprocess operations of LLM response
"""
for idx in range(retry):
llm_resp = await self._aask(prompt)
llm_resp = self._ask(prompt)
if self._func_validate(llm_resp, prompt):
return self._func_cleanup(llm_resp, prompt)
return self.fail_default_resp # TODO fix
return self.fail_default_resp
async def _run_v2(self,
prompt: str,
example_output: str,
special_instruction: str,
retry: int = 3):
def _run_v2(self,
prompt: str,
example_output: str,
special_instruction: str,
retry: int = 3):
""" same with `gpt_structure.ChatGPT_safe_generate_response` """
prompt = '"""\n' + prompt + '\n"""\n'
prompt += f"Output the response to the prompt above in json. {special_instruction}\n"
@ -77,7 +80,8 @@ class STAction(Action):
for idx in range(retry):
try:
llm_resp = await self._aask(prompt)
llm_resp = self._ask(prompt)
print("llm_resp ", llm_resp)
end_idx = llm_resp.strip().rfind("}") + 1
llm_resp = llm_resp[:end_idx]
llm_resp = json.loads(llm_resp)["output"]
@ -88,6 +92,6 @@ class STAction(Action):
pass
return False
async def run(self, *args, **kwargs):
def run(self, *args, **kwargs):
"""Run action"""
raise NotImplementedError("The run method should be implemented in a subclass.")

View file

@ -28,7 +28,7 @@ class SummarizeConv(STAction):
def _func_fail_default_resp(self) -> str:
return "conversing with a housemate about morning greetings"
async def run(self, conv: list):
def run(self, conv: list):
def create_prompt_input(conversation: list):
convo_str = ""
for row in conversation:
@ -44,5 +44,5 @@ class SummarizeConv(STAction):
special_instruction = "The output must continue the sentence above by filling in the <fill in> tag. " \
"Don't start with 'this is a conversation about...' Just finish the sentence " \
"but do not miss any important details (including who are chatting)."
output = await self._run_v2(prompt, example_output, special_instruction)
output = self._run_v2(prompt, example_output, special_instruction)
return output

View file

@ -2,13 +2,18 @@
# -*- coding: utf-8 -*-
# @Desc : maze environment
from pydantic import Field
from metagpt.environment import Environment
from metagpt.roles.role import Role
from .maze import Maze
class MazeEnvironment(Environment):
def __init__(self, name: str, maze: Maze) -> None:
self.name = name
self.maze = maze
maze: Maze = Field(default=Maze)
def add_role(self, role: Role):
role.set_env(self)
self.roles[role.name] = role # use role.name as key not role.profile

View file

@ -11,12 +11,17 @@ Do the steps following:
- execute, move or else in the Maze
"""
import math
import time
from pydantic import Field
from pathlib import Path
import random
import datetime
from operator import itemgetter
from metagpt.roles.role import Role, RoleContext
from metagpt.schema import Message
from metagpt.logs import logger
from ..memory.agent_memory import AgentMemory, BasicMemory
from ..memory.spatial_memory import MemoryTree
@ -25,13 +30,14 @@ from ..actions.user_requirement import UserRequirement
from ..maze_environment import MazeEnvironment
from ..memory.retrieve import agent_retrieve
from ..memory.scratch import Scratch
from ..utils.utils import get_embedding, generate_poig_score
from ..utils.utils import get_embedding, generate_poig_score, path_finder
from ..utils.const import collision_block_id
from ..reflect.st_reflect import agent_reflect
from ..utils.mg_ga_transform import save_movement, get_role_environment
class STRoleContext(RoleContext):
env: 'MazeEnvironment' = Field(default=None)
env: 'MazeEnvironment' = Field(default=MazeEnvironment)
memory: AgentMemory = Field(default=AgentMemory)
scratch: Scratch = Field(default=Scratch)
spatial_memory: MemoryTree = Field(default=MemoryTree)
@ -44,9 +50,21 @@ class STRole(Role):
def __init__(self,
name: str = "Klaus Mueller",
profile: str = "STMember",
sim_path: str = "new_sim",
sim_code: str = "new_sim",
step: int = 0,
start_date: str = "",
curr_time: str = "",
sec_per_step: int = 10,
has_inner_voice: bool = False):
self.sim_path = sim_path
self.sim_code = sim_code
self.step = step
self.start_time = datetime.datetime.strptime(f"{start_date}, 00:00:00", "%B %d, %Y, %H:%M:%S")
self.curr_time = datetime.datetime.strptime(curr_time, "%B %d, %Y, %H:%M:%S")
self.sec_per_step = sec_per_step
self.role_tile = (0, 0)
self.game_obj_cleanup = dict()
self._rc = STRoleContext()
super(STRole, self).__init__(name=name,
profile=profile)
@ -59,10 +77,21 @@ class STRole(Role):
else:
self._watch([DummyAction])
# init role & maze
role_env = get_role_environment(self.sim_code, self.name, self.step)
pt_x = role_env["x"]
pt_y = role_env["y"]
self.role_tile = (pt_x, pt_y)
self._rc.env.maze.tiles[pt_y][pt_x]["events"].add(self.scratch.get_curr_event_and_desc())
@property
def name(self):
return self._setting.name
@property
def scratch(self):
return self._rc.scratch
def load_from(self, folder: Path):
"""
load role data from `storage/{simulation_name}/personas/{role_name}
@ -238,15 +267,197 @@ class STRole(Role):
# TODO re-add result to memory
pass
def execute(self, plan: str):
"""
Args:
plan: This is a string address of the action we need to execute.
It comes in the form of "{world}:{sector}:{arena}:{game_objects}".
It is important that you access this without doing negative
indexing (e.g., [-1]) because the latter address elements may not be
present in some cases.
e.g., "dolores double studio:double studio:bedroom 1:bed"
"""
roles = self._rc.env.get_roles()
maze = self._rc.env.maze
if "<random>" in plan and self._rc.scratch.planned_path == []:
self._rc.scratch.act_path_set = False
# <act_path_set> is set to True if the path is set for the current action.
# It is False otherwise, and means we need to construct a new path.
if not self._rc.scratch.act_path_set:
# <target_tiles> is a list of tile coordinates where the persona may go
# to execute the current action. The goal is to pick one of them.
target_tiles = None
logger.info("plan: ", plan)
if "<persona>" in plan:
# Executing persona-persona interaction.
target_p_tile = (roles[plan.split("<persona>")[-1].strip()]
.scratch.curr_tile)
potential_path = path_finder(maze.collision_maze,
self._rc.scratch.curr_tile,
target_p_tile,
collision_block_id)
if len(potential_path) <= 2:
target_tiles = [potential_path[0]]
else:
potential_1 = path_finder(maze.collision_maze,
self._rc.scratch.curr_tile,
potential_path[int(len(potential_path) / 2)],
collision_block_id)
potential_2 = path_finder(maze.collision_maze,
self._rc.scratch.curr_tile,
potential_path[int(len(potential_path) / 2) + 1],
collision_block_id)
if len(potential_1) <= len(potential_2):
target_tiles = [potential_path[int(len(potential_path) / 2)]]
else:
target_tiles = [potential_path[int(len(potential_path) / 2 + 1)]]
elif "<waiting>" in plan:
# Executing interaction where the persona has decided to wait before
# executing their action.
x = int(plan.split()[1])
y = int(plan.split()[2])
target_tiles = [[x, y]]
elif "<random>" in plan:
# Executing a random location action.
plan = ":".join(plan.split(":")[:-1])
target_tiles = maze.address_tiles[plan]
target_tiles = random.sample(list(target_tiles), 1)
else:
# This is our default execution. We simply take the persona to the
# location where the current action is taking place.
# Retrieve the target addresses. Again, plan is an action address in its
# string form. <maze.address_tiles> takes this and returns candidate
# coordinates.
if plan not in maze.address_tiles:
maze.address_tiles["Johnson Park:park:park garden"] # ERRORRRRRRR
else:
target_tiles = maze.address_tiles[plan]
# There are sometimes more than one tile returned from this (e.g., a tabe
# may stretch many coordinates). So, we sample a few here. And from that
# random sample, we will take the closest ones.
if len(target_tiles) < 4:
target_tiles = random.sample(list(target_tiles), len(target_tiles))
else:
target_tiles = random.sample(list(target_tiles), 4)
# If possible, we want personas to occupy different tiles when they are
# headed to the same location on the maze. It is ok if they end up on the
# same time, but we try to lower that probability.
# We take care of that overlap here.
persona_name_set = set(roles.keys())
new_target_tiles = []
for i in target_tiles:
curr_event_set = maze.access_tile(i)["events"]
pass_curr_tile = False
for j in curr_event_set:
if j[0] in persona_name_set:
pass_curr_tile = True
if not pass_curr_tile:
new_target_tiles += [i]
if len(new_target_tiles) == 0:
new_target_tiles = target_tiles
target_tiles = new_target_tiles
# Now that we've identified the target tile, we find the shortest path to
# one of the target tiles.
curr_tile = self._rc.scratch.curr_tile
collision_maze = maze.collision_maze
closest_target_tile = None
path = None
for i in target_tiles:
# path_finder takes a collision_mze and the curr_tile coordinate as
# an input, and returns a list of coordinate tuples that becomes the
# path.
# e.g., [(0, 1), (1, 1), (1, 2), (1, 3), (1, 4)...]
curr_path = path_finder(maze.collision_maze,
curr_tile,
i,
collision_block_id)
if not closest_target_tile:
closest_target_tile = i
path = curr_path
elif len(curr_path) < len(path):
closest_target_tile = i
path = curr_path
# Actually setting the <planned_path> and <act_path_set>. We cut the
# first element in the planned_path because it includes the curr_tile.
self._rc.scratch.planned_path = path[1:]
self._rc.scratch.act_path_set = True
# Setting up the next immediate step. We stay at our curr_tile if there is
# no <planned_path> left, but otherwise, we go to the next tile in the path.
ret = self._rc.scratch.curr_tile
if self._rc.scratch.planned_path:
ret = self._rc.scratch.planned_path[0]
self._rc.scratch.planned_path = self._rc.scratch.planned_path[1:]
description = f"{self._rc.scratch.act_description}"
description += f" @ {self._rc.scratch.act_address}"
execution = ret, self._rc.scratch.act_pronunciatio, description
return execution
def update_role_env(self) -> bool:
role_env = get_role_environment(self.sim_code, self.name, self.step)
ret = True
if role_env:
for key, val in self.game_obj_cleanup.items():
self._rc.env.maze.turn_event_from_tile_idle(key, val)
# reset game_obj_cleanup
self.game_obj_cleanup = dict()
curr_tile = self.role_tile
new_tile = (role_env["x"], role_env["y"])
self._rc.env.maze.remove_subject_events_from_tile(self.name, curr_tile)
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
# the persona will travel to get to their destination. *Once*
# the persona gets there, we activate the object action.
if not self.scratch.planned_path:
self.game_obj_cleanup[self.scratch.get_curr_event_and_desc()] = new_tile
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
blank = (self.scratch.get_curr_obj_event_and_desc()[0], None, None, None)
self._rc.env.maze.remove_event_from_tile(blank, new_tile)
else:
ret = False
time.sleep(1)
logger.warning(f"{self.sim_code}/environment/{self.step}.json not exist or parses failed,"
f"sleep 1s and re-check")
return ret
async def _react(self) -> Message:
maze_env = self._rc.env
# update role env
ret = self.update_role_env()
if not ret:
# TODO add message
return
# TODO observe
# get maze_env from self._rc.env, and observe env info
# TODO retrieve, use self._rc.memory 's retrieve functions
# TODO plan
plan = self.plan()
# TODO reflect
# TODO execute(feed-back into maze_env)
next_tile, pronunciatio, description = self.execute(plan)
role_move = {
"movement": next_tile,
"pronunciatio": pronunciatio,
"description": description,
"chat": self.scratch.chat
}
save_movement(self.name, role_move, step=self.step, sim_code=self.sim_code, curr_time=self.curr_time)
# step update
self.step += 1
self.curr_time += datetime.timedelta(seconds=self.sec_per_step)

View file

@ -21,12 +21,17 @@ async def startup(idea: str,
reverie_meta = get_reverie_meta(fork_sim_code)
roles = []
sim_path = STORAGE_PATH.joinpath(sim_code)
sim_path.mkdir(exist_ok=True)
for idx, role_name in enumerate(reverie_meta["persona_names"]):
role_stg_path = STORAGE_PATH.joinpath(fork_sim_code).joinpath(f"personas/{role_name}")
has_inner_voice = True if idx == 0 else False
role = STRole(name=role_name,
sim_path=sim_path,
sim_code=sim_code,
profile=f"STMember_{idx}",
step=reverie_meta.get("step", 0),
start_date=reverie_meta.get("start_date"),
curr_time=reverie_meta.get("curr_time"),
sec_per_step=reverie_meta.get("sec_per_step"),
has_inner_voice=has_inner_voice)
role.load_from(role_stg_path)
roles.append(role)
@ -35,7 +40,7 @@ async def startup(idea: str,
town.wakeup_roles(roles)
town.invest(investment)
town.start_project()
town.start_project(idea)
await town.run(n_round)

View file

@ -0,0 +1,17 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of actions/summarize_conv
from metagpt.logs import logger
from st_game.actions.summarize_conv import SummarizeConv
def test_summarize_conv():
conv = [
("Role_A", "what's the weather today?"),
("Role_B", "It looks pretty good, and I will take a walk then.")
]
output = SummarizeConv().run(conv)
assert "weather" in output

View file

@ -1,14 +1,15 @@
def check_if_file_exists(curr_file):
"""
Checks if a file exists
ARGS:
curr_file: path to the current csv file.
RETURNS:
True if the file exists
False if the file does not exist
"""
try:
with open(curr_file) as f_analysis_file: pass
return True
except:
return False
def check_if_file_exists(curr_file):
"""
Checks if a file exists
ARGS:
curr_file: path to the current csv file.
RETURNS:
True if the file exists
False if the file does not exist
"""
try:
with open(curr_file) as f_analysis_file:
pass
return True
except Exception as exp:
return False

View file

@ -8,3 +8,5 @@ ROOT_PATH = Path(__file__).parent.parent
STORAGE_PATH = ROOT_PATH.joinpath("storage")
MAZE_ASSET_PATH = ROOT_PATH.joinpath("static_dirs/assets/the_ville")
PROMPTS_DIR = ROOT_PATH.joinpath("prompts")
collision_block_id = "32125"

View file

@ -2,6 +2,10 @@
# -*- coding: utf-8 -*-
# @Desc : data transform of mg <-> ga under storage
import json
from metagpt.logs import logger
from .const import STORAGE_PATH
from .utils import read_json_file, write_json_file
@ -10,3 +14,32 @@ def get_reverie_meta(sim_code: str) -> dict:
meta_file_path = STORAGE_PATH.joinpath(sim_code).joinpath("reverie/meta.json")
reverie_meta = read_json_file(meta_file_path)
return reverie_meta
def save_movement(role_name: str, role_move: dict, step: int, sim_code: str, curr_time: str):
movement_path = STORAGE_PATH.joinpath(f"{sim_code}/movement/{step}.json")
if not movement_path.parent.exists():
movement_path.parent.mkdir(exist_ok=True)
if movement_path.exists():
with open(movement_path, "r") as fin:
movement = json.load(fin)
else:
movement = {
"persona": dict(),
"meta": dict()
}
movement["persona"][role_name] = role_move
movement["meta"]["curr_time"] = curr_time.strftime("%B %d, %Y, %H:%M:%S")
write_json_file(movement_path, movement)
logger.info(f"save_movement at step: {step}, curr_time: {movement['meta']['curr_time']}")
def get_role_environment(sim_code: str, role_name: str, step: int = 0) -> dict:
env_path = STORAGE_PATH.joinpath(f"{sim_code}/environment/{step}.json")
role_env = None
if env_path.exists():
environment = read_json_file(env_path)
role_env = environment.get(role_name, None)
return role_env

View file

@ -29,12 +29,12 @@ def write_json_file(json_file: str, data: list, encoding=None):
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):
"""
Reads in a csv file to a list of list. If header is True, it returns a
Reads in a csv file to a list of list. If header is True, it returns a
tuple with (header row, all rows)
ARGS:
curr_file: path to the current csv file.
RETURNS:
List of list where the component lists are the rows of the file.
curr_file: path to the current csv file.
RETURNS:
List of list where the component lists are the rows of the file.
"""
if not header:
analysis_list = []
@ -92,3 +92,87 @@ def extract_first_json_dict(data_str: str) -> Union[None, dict]:
except json.JSONDecodeError:
# If parsing fails, return None
return None
def path_finder_v2(a, start, end, collision_block_char) -> list[int]:
def make_step(m, k):
for i in range(len(m)):
for j in range(len(m[i])):
if m[i][j] == k:
if i > 0 and m[i - 1][j] == 0 and a[i - 1][j] == 0:
m[i - 1][j] = k + 1
if j > 0 and m[i][j - 1] == 0 and a[i][j - 1] == 0:
m[i][j - 1] = k + 1
if i < len(m) - 1 and m[i + 1][j] == 0 and a[i + 1][j] == 0:
m[i + 1][j] = k + 1
if j < len(m[i]) - 1 and m[i][j + 1] == 0 and a[i][j + 1] == 0:
m[i][j + 1] = k + 1
new_maze = []
for row in a:
new_row = []
for j in row:
if j == collision_block_char:
new_row += [1]
else:
new_row += [0]
new_maze += [new_row]
a = new_maze
m = []
for i in range(len(a)):
m.append([])
for j in range(len(a[i])):
m[-1].append(0)
i, j = start
m[i][j] = 1
k = 0
except_handle = 150
while m[end[0]][end[1]] == 0:
k += 1
make_step(m, k)
if except_handle == 0:
break
except_handle -= 1
i, j = end
k = m[i][j]
the_path = [(i, j)]
while k > 1:
if i > 0 and m[i - 1][j] == k - 1:
i, j = i - 1, j
the_path.append((i, j))
k -= 1
elif j > 0 and m[i][j - 1] == k - 1:
i, j = i, j - 1
the_path.append((i, j))
k -= 1
elif i < len(m) - 1 and m[i + 1][j] == k - 1:
i, j = i + 1, j
the_path.append((i, j))
k -= 1
elif j < len(m[i]) - 1 and m[i][j + 1] == k - 1:
i, j = i, j + 1
the_path.append((i, j))
k -= 1
the_path.reverse()
return the_path
def path_finder(maze: "Maze", start: list[int], end: list[int], collision_block_char: str) -> list[int]:
# EMERGENCY PATCH
start = (start[1], start[0])
end = (end[1], end[0])
# END EMERGENCY PATCH
path = path_finder_v2(maze, start, end, collision_block_char)
new_path = []
for i in path:
new_path += [(i[1], i[0])]
path = new_path
return path