add general environment and refactor environment's w/r funcs

This commit is contained in:
better629 2023-10-16 15:19:03 +08:00
parent cb9793e69b
commit 6ebe8efc63
21 changed files with 252 additions and 147 deletions

View file

@ -8,7 +8,6 @@ import random
from metagpt.logs import logger
from metagpt.schema import Message
from ..maze import Maze
from .st_action import STAction
@ -34,9 +33,9 @@ class GenActionSector(STAction):
fs = ("kitchen")
return fs
def run(self, role: "STRole", maze: Maze, act_desp: str):
def create_prompt_input(role, maze, act_desp):
act_world = f"{maze.access_tile(role.scratch.curr_tile)['world']}"
def run(self, role: "STRole", access_tile: dict[str, str], act_desp: str):
def create_prompt_input(role, access_tile: dict[str, str], act_desp):
act_world = f"{access_tile['world']}"
prompt_input = []
@ -46,8 +45,8 @@ class GenActionSector(STAction):
prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)]
prompt_input += [role.scratch.get_str_name()]
prompt_input += [f"{maze.access_tile(role.scratch.curr_tile)['sector']}"]
x = f"{act_world}:{maze.access_tile(role.scratch.curr_tile)['sector']}"
prompt_input += [f"{access_tile['sector']}"]
x = f"{act_world}:{access_tile['sector']}"
prompt_input += [role.s_mem.get_str_accessible_sector_arenas(x)]
if role.scratch.get_str_daily_plan_req() != "":
@ -57,7 +56,7 @@ class GenActionSector(STAction):
# MAR 11 TEMP
prompt_input = []
act_world = maze.access_tile(role.scratch.curr_tile)["world"]
act_world = access_tile["world"]
accessible_sector_str = role.s_mem.get_str_accessible_sectors(act_world)
curr = accessible_sector_str.split(", ")
fin_accessible_sectors = []
@ -85,12 +84,12 @@ class GenActionSector(STAction):
return prompt_input
prompt_template = "action_location_sector_v1.txt"
prompt_input = create_prompt_input(role, maze, act_desp)
prompt_input = create_prompt_input(role, access_tile, act_desp)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = self._run_text_davinci(prompt, max_tokens=15)
y = f"{maze.access_tile(role.scratch.curr_tile)['world']}"
y = f"{access_tile['world']}"
x = [i.strip() for i in role.s_mem.get_str_accessible_sectors(y).split(",")]
if output not in x:
# output = random.choice(x)
@ -120,12 +119,9 @@ class GenActionArena(STAction):
fs = ("kitchen")
return fs
def run(self, role: "STRole", maze: Maze, act_desp: str, act_world: str, act_sector: str):
def create_prompt_input(role, maze, act_desp, act_world, act_sector):
def run(self, role: "STRole", act_desp: str, act_world: str, act_sector: str):
def create_prompt_input(role, act_desp, act_world, act_sector):
prompt_input = []
# prompt_input += [role.scratch.get_str_name()]
# prompt_input += [maze.access_tile(role.scratch.curr_tile)["arena"]]
# prompt_input += [maze.access_tile(role.scratch.curr_tile)["sector"]]
prompt_input += [role.scratch.get_str_name()]
x = f"{act_world}:{act_sector}"
prompt_input += [act_sector]
@ -159,7 +155,7 @@ class GenActionArena(STAction):
return prompt_input
prompt_template = "action_location_object_vMar11.txt"
prompt_input = create_prompt_input(role, maze, act_desp, act_world, act_sector)
prompt_input = create_prompt_input(role, act_desp, act_world, act_sector)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input, prompt_template)
self.fail_default_resp = self._func_fail_default_resp()
output = self._run_text_davinci(prompt, max_tokens=15)
@ -392,10 +388,10 @@ class GenActionDetails(STAction):
role: "STRole",
act_desp: str,
act_dura):
maze = role._rc.env.maze
act_world = maze.access_tile(role.scratch.curr_tile)["world"]
act_sector = GenActionSector().run(role, maze, act_desp)
act_arena = GenActionArena().run(role, maze, act_desp, act_world, act_sector)
access_tile = role._rc.env.call_func("access_tile", tile=role.scratch.curr_tile)
act_world = access_tile["world"]
act_sector = GenActionSector().run(role, access_tile, act_desp)
act_arena = GenActionArena().run(role, act_desp, act_world, act_sector)
act_address = f"{act_world}:{act_sector}:{act_arena}"
act_game_object = GenActionObject().run(role, act_desp, act_address)
new_address = f"{act_world}:{act_sector}:{act_arena}:{act_game_object}"

View file

@ -7,7 +7,6 @@ from metagpt.schema import Message
from examples.st_game.actions.st_action import STAction
from examples.st_game.utils.utils import extract_first_json_dict
from examples.st_game.maze import Maze
class GenIterChatUTT(STAction):
@ -44,9 +43,9 @@ class GenIterChatUTT(STAction):
cleaned_dict["end"] = False
return cleaned_dict
def run(self, maze: Maze, init_role: "STRole", target_role: "STRole", retrieved: dict, curr_context: str,
def run(self, init_role: "STRole", target_role: "STRole", retrieved: dict, curr_context: str,
curr_chat: list[str], *args, **kwargs) -> dict:
def create_prompt_input(maze: Maze, init_role: "STRole", target_role: "STRole",
def create_prompt_input(access_tile: dict[str, str], init_role: "STRole", target_role: "STRole",
retrieved: dict, curr_context: str, curr_chat: list[str]):
role = init_role
scratch = role._rc.scratch
@ -67,8 +66,8 @@ class GenIterChatUTT(STAction):
prev_convo_insert = ""
print(prev_convo_insert)
curr_sector = f"{maze.access_tile(scratch.curr_tile)['sector']}"
curr_arena = f"{maze.access_tile(scratch.curr_tile)['arena']}"
curr_sector = f"{access_tile['sector']}"
curr_arena = f"{access_tile['arena']}"
curr_location = f"{curr_arena} in {curr_sector}"
retrieved_str = ""
@ -91,7 +90,8 @@ class GenIterChatUTT(STAction):
]
return prompt_input
prompt_input = create_prompt_input(maze, init_role, target_role, retrieved, curr_context, curr_chat)
access_tile = init_role._rc.env.call_func("access_tile", tile=init_role.scratch.curr_tile)
prompt_input = create_prompt_input(access_tile, init_role, target_role, retrieved, curr_context, curr_chat)
prompt = self.generate_prompt_with_tmpl_filename(prompt_input,
"iterative_convo_v1.txt")
# original using `ChatGPT_safe_generate_response_OLD`

View file

@ -2,18 +2,40 @@
# -*- coding: utf-8 -*-
# @Desc : maze environment
from typing import Tuple
from pydantic import Field
from metagpt.environment import Environment
from metagpt.environment.environment import Environment
from metagpt.environment.general_environment import GeneralEnvironment
from metagpt.roles.role import Role
from examples.st_game.maze import Maze
class MazeEnvironment(Environment):
class MazeEnvironment(GeneralEnvironment):
maze: Maze = Field(default_factory=Maze)
def add_role(self, role: Role):
role.set_env(self)
self.roles[role.name] = role # use role.name as key not role.profile
self.roles[role.name] = role
def init_register_funcs(self):
self.register_func("access_tile", self.maze.access_tile)
self.register_func("add_tiles_event", self.add_tiles_event)
self.register_func("get_nearby_tiles", self.maze.get_nearby_tiles)
self.register_func("get_tile_path", self.maze.get_tile_path)
self.register_func("get_collision_maze", self.get_collision_maze)
self.register_func("get_address_tiles", self.get_address_tiles)
self.register_func("turn_event_from_tile_idle", self.maze.turn_event_from_tile_idle)
self.register_func("remove_subject_events_from_tile", self.maze.remove_subject_events_from_tile)
self.register_func("add_event_from_tile", self.maze.add_event_from_tile)
self.register_func("remove_event_from_tile", self.maze.remove_event_from_tile)
def add_tiles_event(self, pt_y: int, pt_x: int, event: Tuple[str, str, str, str]):
self.maze.tiles[pt_y][pt_x]["events"].add(event)
def get_collision_maze(self) -> list:
return self.maze.collision_maze
def get_address_tiles(self) -> dict:
return self.maze.address_tiles

View file

@ -6,13 +6,12 @@ from typing import Union, Tuple
from metagpt.logs import logger
from examples.st_game.maze import Maze
from examples.st_game.memory.retrieve import new_agent_retrieve
from examples.st_game.actions.agent_chat_sum_rel import AgentChatSumRel
from examples.st_game.actions.gen_iter_chat_utt import GenIterChatUTT
def agent_conversation(maze: Maze, init_role: "STRole", target_role: "STRole") -> list[list[str]]:
def agent_conversation(init_role: "STRole", target_role: "STRole") -> list[list[str]]:
curr_chat = []
logger.info(f"Role: {init_role.name} starts a conversation with Role: {target_role.name}")
@ -37,7 +36,7 @@ def agent_conversation(maze: Maze, init_role: "STRole", target_role: "STRole") -
focal_points = [f"{relationship}",
f"{target_scratch.name} is {target_scratch.act_description}"]
retrieved = new_agent_retrieve(init_role, focal_points, 15)
utt, end = generate_one_utterance(maze, init_role, target_role, retrieved, curr_chat)
utt, end = generate_one_utterance(init_role, target_role, retrieved, curr_chat)
curr_chat += [[scratch.name, utt]]
if end:
@ -58,7 +57,7 @@ def agent_conversation(maze: Maze, init_role: "STRole", target_role: "STRole") -
focal_points = [f"{relationship}",
f"{scratch.name} is {scratch.act_description}"]
retrieved = new_agent_retrieve(target_role, focal_points, 15)
utt, end = generate_one_utterance(maze, target_role, init_role, retrieved, curr_chat)
utt, end = generate_one_utterance(target_role, init_role, retrieved, curr_chat)
curr_chat += [[target_scratch.name, utt]]
if end:
@ -88,7 +87,7 @@ def generate_summarize_agent_relationship(init_role: "STRole",
return summarized_relationship
def generate_one_utterance(maze: Maze, init_role, target_role, retrieved: dict, curr_chat: list) -> Tuple[str, str]:
def generate_one_utterance(init_role, target_role, retrieved: dict, curr_chat: list) -> Tuple[str, str]:
# Chat version optimized for speed via batch generation
scratch = init_role._rc.scratch
target_scratch = target_role._rc.scratch
@ -101,6 +100,6 @@ def generate_one_utterance(maze: Maze, init_role, target_role, retrieved: dict,
f"is initiating a conversation with " +
f"{target_scratch.name}.")
x = GenIterChatUTT().run(maze, init_role, target_role, retrieved, curr_context, curr_chat)
x = GenIterChatUTT().run(init_role, target_role, retrieved, curr_context, curr_chat)
return x["utterance"], x["end"]

View file

@ -9,7 +9,6 @@ import math
from metagpt.llm import LLM
from metagpt.logs import logger
from ..maze import Maze
from ..plan.converse import agent_conversation
from ..actions.decide_to_talk import DecideToTalk
from ..actions.summarize_conv import SummarizeConv
@ -23,16 +22,16 @@ from ..utils.utils import get_embedding
from ..memory.retrieve import new_agent_retrieve
def plan(role: "STRole", maze: Maze, roles: dict["STRole"], new_day: bool, retrieved: dict) -> str:
def plan(role: "STRole", roles: dict["STRole"], new_day: bool, retrieved: dict) -> str:
# PART 1: Generate the hourly schedule.
if new_day:
if new_day:
_long_term_planning(role, new_day)
# PART 2: If the current action has expired, we want to create a new plan.
act_check_finished = role.scratch.act_check_finished()
logger.info(f"Role: {role.name} act_check_finished is {act_check_finished}")
if act_check_finished:
_determine_action(role, maze)
_determine_action(role)
# PART 3: If you perceived an event that needs to be responded to (saw
# another role), and retrieved relevant information.
@ -60,7 +59,7 @@ def plan(role: "STRole", maze: Maze, roles: dict["STRole"], new_day: bool, retri
if reaction_mode:
# If we do want to chat, then we generate conversation
if reaction_mode[:9] == "chat with":
_chat_react(maze, role, reaction_mode, roles)
_chat_react(role, reaction_mode, roles)
elif reaction_mode[:4] == "wait":
_wait_react(role, reaction_mode)
@ -246,7 +245,7 @@ def _should_react(role: "STRole", retrieved: dict, roles: dict):
return False
def _chat_react(maze: Maze, role: "STRole", reaction_mode: str, roles: dict["STRole"]):
def _chat_react(role: "STRole", reaction_mode: str, roles: dict["STRole"]):
# There are two roles -- the role who is initiating the conversation
# and the role who is the target. We get the role instances here.
init_role = role
@ -254,7 +253,7 @@ def _chat_react(maze: Maze, role: "STRole", reaction_mode: str, roles: dict["STR
curr_roles = [init_role, target_role]
# Actually creating the conversation here.
convo, duration_min = generate_convo(maze, init_role, target_role) # 2222
convo, duration_min = generate_convo(init_role, target_role) # 2222
convo_summary = generate_convo_summary(convo)
inserted_act = convo_summary
inserted_act_dur = duration_min
@ -313,9 +312,9 @@ def _create_react(role: "STRole", inserted_act: str, inserted_act_dur: int,
elif (scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] +
scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][1]):
end_hour = start_hour + (
(scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] +
scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][
1]) / 60)
(scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index()][1] +
scratch.f_daily_schedule_hourly_org[scratch.get_f_daily_schedule_hourly_org_index() + 1][
1]) / 60)
else:
end_hour = start_hour + 2
@ -357,7 +356,7 @@ def _wait_react(role: "STRole", reaction_mode: str):
inserted_act = f'waiting to start {scratch.act_description.split("(")[-1][:-1]}'
end_time = datetime.datetime.strptime(reaction_mode[6:].strip(), "%B %d, %Y, %H:%M:%S")
inserted_act_dur = (end_time.minute + end_time.hour * 60) - (
scratch.curr_time.minute + scratch.curr_time.hour * 60) + 1
scratch.curr_time.minute + scratch.curr_time.hour * 60) + 1
act_address = f"<waiting> {scratch.curr_tile[0]} {scratch.curr_tile[1]}"
act_event = (role.name, "waiting to start", scratch.act_description.split("(")[-1][:-1])
@ -376,8 +375,8 @@ def _wait_react(role: "STRole", reaction_mode: str):
act_pronunciatio, act_obj_description, act_obj_pronunciatio, act_obj_event)
def generate_convo(maze: Maze, init_role: "STRole", target_role: "STRole") -> Union[list, int]:
convo = agent_conversation(maze, init_role, target_role)
def generate_convo(init_role: "STRole", target_role: "STRole") -> Union[list, int]:
convo = agent_conversation(init_role, target_role)
all_utt = ""
for row in convo:
@ -424,7 +423,7 @@ def generate_new_decomp_schedule(role: "STRole", inserted_act: str, inserted_act
truncated_act_dur += [[scratch.f_daily_schedule[count][0],
dur_sum - today_min_pass]]
truncated_act_dur[-1][-1] -= (
dur_sum - today_min_pass) # DEC 7 DEBUG;.. is the +1 the right thing to do???
dur_sum - today_min_pass) # DEC 7 DEBUG;.. is the +1 the right thing to do???
# DEC 7 DEBUG;.. is the +1 the right thing to do???
# truncated_act_dur[-1][-1] -= (dur_sum - today_min_pass + 1)
print("DEBUG::: ", truncated_act_dur)
@ -463,7 +462,7 @@ def generate_new_decomp_schedule(role: "STRole", inserted_act: str, inserted_act
inserted_act_dur)
def _long_term_planning(role: "STRole", new_day: bool):
def _long_term_planning(role: "STRole", new_day: bool):
"""
Formulates the role's daily long-term plan if it is the start of a new
day. This basically has two components: first, we create the wake-up hour,
@ -481,7 +480,7 @@ def _long_term_planning(role: "STRole", new_day: bool):
# When it is a new day, we start by creating the daily_req of the role.
# Note that the daily_req is a list of strings that describe the role's
# day in broad strokes.
if new_day == "First day":
if new_day == "First day":
# Bootstrapping the daily plan for the start of then generation:
# if this is the start of generation (so there is no previous day's
# daily requirement, or if we are on a new day, we want to create a new
@ -504,7 +503,7 @@ def _long_term_planning(role: "STRole", new_day: bool):
# Added March 4 -- adding plan to the memory.
thought = f"This is {role.scratch.name}'s plan for {role.scratch.curr_time.strftime('%A %B %d')}:"
for i in role.scratch.daily_req:
for i in role.scratch.daily_req:
thought += f" {i},"
thought = thought[:-1] + "."
created = role.scratch.curr_time
@ -513,16 +512,16 @@ def _long_term_planning(role: "STRole", new_day: bool):
keywords = set(["plan"])
thought_poignancy = 5
thought_embedding_pair = (thought, get_embedding(thought))
role.a_mem.add_thought(created, expiration, s, p, o,
thought, keywords, thought_poignancy,
thought_embedding_pair, None)
role.a_mem.add_thought(created, expiration, s, p, o,
thought, keywords, thought_poignancy,
thought_embedding_pair, None)
# print("Sleeping for 20 seconds...")
# time.sleep(10)
# print("Done sleeping!")
def _determine_action(role: "STRole", maze: Maze):
def _determine_action(role: "STRole"):
"""
Creates the next action sequence for the role.
The main goal of this function is to run "add_new_action" on the role's
@ -531,9 +530,9 @@ def _determine_action(role: "STRole", maze: Maze):
As a part of this, the role may need to decompose its hourly schedule as
needed.
INPUT
role: Current <Persona> instance whose action we are determining.
maze: Current <Maze> instance.
role: Current <Persona> instance whose action we are determining.
"""
def determine_decomp(act_desp, act_dura):
"""
Given an action description and its duration, we determine whether we need
@ -546,12 +545,12 @@ def _determine_action(role: "STRole", maze: Maze):
OUTPUT:
a boolean. True if we need to decompose, False otherwise.
"""
if "sleep" not in act_desp and "bed" not in act_desp:
if "sleep" not in act_desp and "bed" not in act_desp:
return True
elif "sleeping" in act_desp or "asleep" in act_desp or "in bed" in act_desp:
return False
elif "sleep" in act_desp or "bed" in act_desp:
if act_dura > 60:
elif "sleep" in act_desp or "bed" in act_desp:
if act_dura > 60:
return False
return True
@ -570,18 +569,18 @@ def _determine_action(role: "STRole", maze: Maze):
if curr_index == 0:
# This portion is invoked if it is the first hour of the day.
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index]
if act_dura >= 60:
# We decompose if the next action is longer than an hour, and fits the
# criteria described in determine_decomp.
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index:curr_index+1] = (
TaskDecomp().run(role, act_desp, act_dura))
if act_dura >= 60:
# We decompose if the next action is longer than an hour, and fits the
# criteria described in determine_decomp.
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index:curr_index + 1] = (
TaskDecomp().run(role, act_desp, act_dura))
if curr_index_60 + 1 < len(role.scratch.f_daily_schedule):
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60+1]
if act_dura >= 60:
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index_60+1:curr_index_60+2] = (
TaskDecomp().run(role, act_desp, act_dura))
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60 + 1]
if act_dura >= 60:
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index_60 + 1:curr_index_60 + 2] = (
TaskDecomp().run(role, act_desp, act_dura))
if curr_index_60 < len(role.scratch.f_daily_schedule):
# If it is not the first hour of the day, this is always invoked (it is
@ -589,42 +588,42 @@ def _determine_action(role: "STRole", maze: Maze):
# decompose two hours in one go). Of course, we need to have something to
# decompose as well, so we check for that too.
if role.scratch.curr_time.hour < 23:
# And we don't want to decompose after 11 pm.
# And we don't want to decompose after 11 pm.
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index_60]
if act_dura >= 60:
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index_60:curr_index_60+1] = (
TaskDecomp().run(role, act_desp, act_dura))
if act_dura >= 60:
if determine_decomp(act_desp, act_dura):
role.scratch.f_daily_schedule[curr_index_60:curr_index_60 + 1] = (
TaskDecomp().run(role, act_desp, act_dura))
# * End of Decompose *
# Generate an <Action> instance from the action description and duration. By
# this point, we assume that all the relevant actions are decomposed and
# ready in f_daily_schedule.
print ("DEBUG LJSDLFSKJF")
for i in role.scratch.f_daily_schedule: print (i)
print (curr_index)
print (len(role.scratch.f_daily_schedule))
print (role.scratch.name)
print ("------")
print("DEBUG LJSDLFSKJF")
for i in role.scratch.f_daily_schedule: print(i)
print(curr_index)
print(len(role.scratch.f_daily_schedule))
print(role.scratch.name)
print("------")
# 1440
x_emergency = 0
for i in role.scratch.f_daily_schedule:
for i in role.scratch.f_daily_schedule:
x_emergency += i[1]
# print ("x_emergency", x_emergency)
if 1440 - x_emergency > 0:
print ("x_emergency__AAA", x_emergency)
if 1440 - x_emergency > 0:
print("x_emergency__AAA", x_emergency)
role.scratch.f_daily_schedule += [["sleeping", 1440 - x_emergency]]
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index]
act_desp, act_dura = role.scratch.f_daily_schedule[curr_index]
new_action_details = GenActionDetails().run(role, act_desp, act_dura)
# Adding the action to role's queue.
role.scratch.add_new_action(**new_action_details)
def revise_identity(role: "STRole"):
def revise_identity(role: "STRole"):
p_name = role.scratch.name
focal_points = [f"{p_name}'s plan for {role.scratch.get_str_curr_date_str()}.",
@ -633,7 +632,7 @@ def revise_identity(role: "STRole"):
statements = "[Statements]\n"
for key, val in retrieved.items():
for i in val:
for i in val:
statements += f"{i.created.strftime('%A %B %d -- %H:%M %p')}: {i.embedding_key}\n"
# print (";adjhfno;asdjao;idfjo;af", p_name)
@ -653,7 +652,7 @@ def revise_identity(role: "STRole"):
currently_prompt = f"{p_name}'s status from {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n"
currently_prompt += f"{role.scratch.currently}\n\n"
currently_prompt += f"{p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n"
currently_prompt += f"{p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}:\n"
currently_prompt += (plan_note + thought_note).replace('\n', '') + "\n\n"
currently_prompt += f"It is now {role.scratch.curr_time.strftime('%A %B %d')}. Given the above, write {p_name}'s status for {role.scratch.curr_time.strftime('%A %B %d')} that reflects {p_name}'s thoughts at the end of {(role.scratch.curr_time - datetime.timedelta(days=1)).strftime('%A %B %d')}. Write this in third-person talking about {p_name}."
currently_prompt += f"If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement).\n\n"
@ -673,7 +672,5 @@ def revise_identity(role: "STRole"):
new_daily_req = LLM().ask(daily_req_prompt)
new_daily_req = new_daily_req.replace('\n', ' ')
print ("WE ARE HERE!!!", new_daily_req)
print("WE ARE HERE!!!", new_daily_req)
role.scratch.daily_plan_req = new_daily_req

View file

@ -14,7 +14,6 @@ import math
import time
from pydantic import Field
from pathlib import Path
import random
import datetime
from operator import itemgetter
@ -28,7 +27,6 @@ from examples.st_game.memory.spatial_memory import MemoryTree
from examples.st_game.actions.dummy_action import DummyAction, DummyMessage
from examples.st_game.actions.user_requirement import UserRequirement
from examples.st_game.maze_environment import MazeEnvironment
from examples.st_game.memory.retrieve import new_agent_retrieve
from examples.st_game.memory.scratch import Scratch
from examples.st_game.utils.utils import get_embedding, path_finder
from examples.st_game.utils.const import collision_block_id, STORAGE_PATH
@ -55,6 +53,7 @@ class STRole(Role):
name: str = "Klaus Mueller",
profile: str = "STMember",
sim_code: str = "new_sim",
env: "MazeEnvironment" = None,
step: int = 0,
start_date: str = "",
curr_time: str = "",
@ -74,6 +73,7 @@ class STRole(Role):
self.role_storage_path = STORAGE_PATH.joinpath(f"{sim_code}/personas/{self.name}")
self._rc = STRoleContext()
self.set_env(env) # init environment before start_project
self.load_from() # load role's memory
self._init_actions([])
@ -89,7 +89,7 @@ class STRole(Role):
pt_x = role_env["x"]
pt_y = role_env["y"]
self._rc.scratch.curr_tile = (pt_x, pt_y)
self._rc.env.maze.tiles[pt_y][pt_x]["events"].add(self.scratch.get_curr_event_and_desc())
self._rc.env.call_func("add_tiles_event", pt_y=pt_y, pt_x=pt_x, event=self.scratch.get_curr_event_and_desc())
@property
def name(self):
@ -195,23 +195,25 @@ class STRole(Role):
OUTPUT:
ret_events: a list of <BasicMemory> that are perceived and new.
"""
maze = self._rc.env.maze
# PERCEIVE SPACE
# We get the nearby tiles given our current tile and the persona's vision
# radius.
nearby_tiles = maze.get_nearby_tiles(self._rc.scratch.curr_tile,
self._rc.scratch.vision_r)
nearby_tiles = self._rc.env.call_func("get_nearby_tiles",
tile=self._rc.scratch.curr_tile,
vision_r=self._rc.scratch.vision_r)
# We then store the perceived space. Note that the s_mem of the persona is
# in the form of a tree constructed using dictionaries.
for tile in nearby_tiles:
tile_info = maze.access_tile(tile)
tile_info = self._rc.env.call_func("access_tile", tile=tile)
self._rc.spatial_memory.add_tile_info(tile_info)
# PERCEIVE EVENTS.
# We will perceive events that take place in the same arena as the
# persona's current arena.
curr_arena_path = maze.get_tile_path(self._rc.scratch.curr_tile, "arena")
curr_arena_path = self._rc.env.call_func("get_tile_path",
tile=self._rc.scratch.curr_tile,
level="arena")
# We do not perceive the same event twice (this can happen if an object is
# extended across multiple tiles).
percept_events_set = set()
@ -221,9 +223,12 @@ class STRole(Role):
# First, we put all events that are occuring in the nearby tiles into the
# percept_events_list
for tile in nearby_tiles:
tile_details = maze.access_tile(tile)
tile_details = self._rc.env.call_func("access_tile", tile=tile)
if tile_details["events"]:
if maze.get_tile_path(tile, "arena") == curr_arena_path:
tmp_arena_path = self._rc.env.call_func("get_tile_path",
tile=tile,
level="arena")
if tmp_arena_path == curr_arena_path:
# This calculates the distance between the persona's current tile,
# and the target tile.
dist = math.dist([tile[0], tile[1]],
@ -356,7 +361,6 @@ class STRole(Role):
e.g., "dolores double studio:double studio:bedroom 1:bed"
"""
roles = self._rc.env.get_roles()
maze = self._rc.env.maze
if "<random>" in plan and self._rc.scratch.planned_path == []:
self._rc.scratch.act_path_set = False
@ -372,18 +376,18 @@ class STRole(Role):
# Executing persona-persona interaction.
target_p_tile = (roles[plan.split("<persona>")[-1].strip()]
.scratch.curr_tile)
potential_path = path_finder(maze.collision_maze,
potential_path = path_finder(self._rc.env.call_func("get_collision_maze"),
self._rc.scratch.curr_tile,
target_p_tile,
collision_block_id)
if len(potential_path) <= 2:
target_tiles = [potential_path[0]]
else:
potential_1 = path_finder(maze.collision_maze,
potential_1 = path_finder(self._rc.env.call_func("get_collision_maze"),
self._rc.scratch.curr_tile,
potential_path[int(len(potential_path) / 2)],
collision_block_id)
potential_2 = path_finder(maze.collision_maze,
potential_2 = path_finder(self._rc.env.call_func("get_collision_maze"),
self._rc.scratch.curr_tile,
potential_path[int(len(potential_path) / 2) + 1],
collision_block_id)
@ -402,7 +406,7 @@ class STRole(Role):
elif "<random>" in plan:
# Executing a random location action.
plan = ":".join(plan.split(":")[:-1])
target_tiles = maze.address_tiles[plan]
target_tiles = self._rc.env.call_func("get_address_tiles")[plan]
target_tiles = random.sample(list(target_tiles), 1)
else:
@ -411,10 +415,10 @@ class STRole(Role):
# Retrieve the target addresses. Again, plan is an action address in its
# string form. <maze.address_tiles> takes this and returns candidate
# coordinates.
if plan not in maze.address_tiles:
maze.address_tiles["Johnson Park:park:park garden"] # ERRORRRRRRR
if plan not in self._rc.env.call_func("get_address_tiles"):
self._rc.env.call_func("get_address_tiles")["Johnson Park:park:park garden"] # ERRORRRRRRR
else:
target_tiles = maze.address_tiles[plan]
target_tiles = self._rc.env.call_func("get_address_tiles")[plan]
# There are sometimes more than one tile returned from this (e.g., a tabe
# may stretch many coordinates). So, we sample a few here. And from that
@ -430,7 +434,8 @@ class STRole(Role):
persona_name_set = set(roles.keys())
new_target_tiles = []
for i in target_tiles:
curr_event_set = maze.access_tile(i)["events"]
access_tile = self._rc.env.call_func("access_tile", tile=i)
curr_event_set = access_tile["events"]
pass_curr_tile = False
for j in curr_event_set:
if j[0] in persona_name_set:
@ -444,7 +449,6 @@ class STRole(Role):
# Now that we've identified the target tile, we find the shortest path to
# one of the target tiles.
curr_tile = self._rc.scratch.curr_tile
collision_maze = maze.collision_maze
closest_target_tile = None
path = None
for i in target_tiles:
@ -452,7 +456,7 @@ class STRole(Role):
# an input, and returns a list of coordinate tuples that becomes the
# path.
# e.g., [(0, 1), (1, 1), (1, 2), (1, 3), (1, 4)...]
curr_path = path_finder(maze.collision_maze,
curr_path = path_finder(self._rc.env.call_func("get_collision_maze"),
curr_tile,
i,
collision_block_id)
@ -486,22 +490,26 @@ class STRole(Role):
ret = True
if role_env:
for key, val in self.game_obj_cleanup.items():
self._rc.env.maze.turn_event_from_tile_idle(key, val)
self._rc.env.call_func("turn_event_from_tile_idle", curr_event=key, tile=val)
# reset game_obj_cleanup
self.game_obj_cleanup = dict()
curr_tile = self.role_tile
new_tile = (role_env["x"], role_env["y"])
self._rc.env.maze.remove_subject_events_from_tile(self.name, curr_tile)
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
self._rc.env.call_func("remove_subject_events_from_tile", subject=self.name, tile=curr_tile)
self._rc.env.call_func("add_event_from_tile",
curr_event=self.scratch.get_curr_event_and_desc(),
tile=new_tile)
# the persona will travel to get to their destination. *Once*
# the persona gets there, we activate the object action.
if not self.scratch.planned_path:
self.game_obj_cleanup[self.scratch.get_curr_event_and_desc()] = new_tile
self._rc.env.maze.add_event_from_tile(self.scratch.get_curr_event_and_desc(), new_tile)
self._rc.env.call_func("add_event_from_tile",
curr_event=self.scratch.get_curr_event_and_desc(),
tile=new_tile)
blank = (self.scratch.get_curr_obj_event_and_desc()[0], None, None, None)
self._rc.env.maze.remove_event_from_tile(blank, new_tile)
self._rc.env.call_func("remove_event_from_tile", curr_event=blank, tile=new_tile)
# update role's new tile
self._rc.scratch.curr_tile = new_tile
@ -535,7 +543,7 @@ class STRole(Role):
# use self._rc.memory 's retrieve functions
retrieved = self.retrieve(observed)
plans = plan(self, self._rc.env.maze, self._rc.env.get_roles(), new_day, retrieved)
plans = plan(self, self._rc.env.get_roles(), new_day, retrieved)
self.reflect()

View file

@ -17,6 +17,9 @@ async def startup(idea: str,
sim_code: str,
investment: float = 30.0,
n_round: int = 500):
town = StanfordTown()
town.init_env()
# copy `storage/{fork_sim_code}` to `storage/{sim_code}`
copy_folder(str(STORAGE_PATH.joinpath(fork_sim_code)), str(STORAGE_PATH.joinpath(sim_code)))
@ -28,8 +31,9 @@ async def startup(idea: str,
for idx, role_name in enumerate(reverie_meta["persona_names"]):
has_inner_voice = True if idx == 0 else False
role = STRole(name=role_name,
sim_code=sim_code,
profile=role_name,
sim_code=sim_code,
env=town.environment,
step=reverie_meta.get("step", 0),
start_date=reverie_meta.get("start_date"),
curr_time=reverie_meta.get("curr_time"),
@ -41,7 +45,6 @@ async def startup(idea: str,
write_curr_sim_code({"sim_code": sim_code})
write_curr_step({"step": reverie_meta.get("step", 0)})
town = StanfordTown()
town.wakeup_roles(roles)
town.invest(investment)

View file

@ -17,6 +17,10 @@ class StanfordTown(SoftwareCompany):
environment: MazeEnvironment = Field(default_factory=MazeEnvironment)
def init_env(self):
logger.info("StanfordTown init environment")
self.environment.init_register_funcs()
def wakeup_roles(self, roles: list[Role]):
logger.warning(f"The Town add {len(roles)} roles, and start to operate.")
self.environment.add_roles(roles)

View file

@ -10,20 +10,20 @@ from examples.st_game.actions.gen_action_details import (
GenActObjDescription,
GenEventTriple,
GenObjEventTriple,
GenPronunciatio
)
GenPronunciatio
)
from examples.st_game.roles.st_role import STRole
role = STRole(name="Klaus Mueller", start_date="October 4, 2023", curr_time="October 4, 2023, 00:00:00",
sim_code="base_the_ville_isabella_maria_klaus")
maze = role._rc.env.maze
role = STRole(name="Klaus Mueller", start_date="October 4, 2023", curr_time="October 4, 2023, 00:00:00",
sim_code="base_the_ville_isabella_maria_klaus")
access_tile = role._rc.env.call_func("access_tile", tile=role.scratch.curr_tile)
act_desp = "klaus mueller starts the day by making a coffee"
act_dura = "20"
act_world = maze.access_tile(role.scratch.curr_tile)["world"]
act_world = access_tile["world"]
assert act_world == "the Ville"
sector = GenActionSector().run(role, maze, act_desp)
arena = GenActionArena().run(role, maze, act_desp, act_world, sector)
sector = GenActionSector().run(role, access_tile, act_desp)
arena = GenActionArena().run(role, act_desp, act_world, sector)
temp_address = f"{act_world}:{sector}:{arena}"
obj = GenActionObject().run(role, act_desp, temp_address)
@ -33,24 +33,29 @@ act_obj_desp = GenActObjDescription().run(role, obj, act_desp)
result_dict = GenActionDetails().run(role, act_desp, act_dura)
def test_gen_action_sector():
assert isinstance(sector, str)
assert sector in role.s_mem.get_str_accessible_sectors(act_world)
def test_gen_action_arena():
assert isinstance(arena, str)
assert arena in role.s_mem.get_str_accessible_sector_arenas(f"{act_world}:{sector}")
def test_gen_action_obj():
assert isinstance(obj, str)
assert obj in role.s_mem.get_str_accessible_arena_game_objects(temp_address)
# def test_gen_event_triple():
# assert len(event_triple) == 3
# def test_gen_obj_event_triple():
# assert len(obj_triple) == 3
def test_gen_action_details():
if result_dict:
for key in [
@ -70,4 +75,3 @@ def test_gen_action_details():
assert result_dict["action_address"] == f"{temp_address}:{obj}"
assert result_dict["action_duration"] == int(act_dura)
assert result_dict["act_obj_description"] == act_obj_desp

View file

@ -5,7 +5,6 @@
from typing import Tuple
from examples.st_game.roles.st_role import STRole
from examples.st_game.maze import Maze
from examples.st_game.utils.const import STORAGE_PATH
from examples.st_game.utils.mg_ga_transform import get_reverie_meta
from examples.st_game.utils.utils import copy_folder
@ -43,8 +42,7 @@ def init_two_roles(fork_sim_code: str = "July1_the_ville_isabella_maria_klaus-st
def test_agent_conversation():
role_ir, role_km = init_two_roles()
maze = Maze()
curr_chat = agent_conversation(maze, role_ir, role_km)
curr_chat = agent_conversation(role_ir, role_km)
assert len(curr_chat) % 2 == 0
meet = False

View file

@ -1,11 +1,8 @@
from ..utils.const import MAZE_ASSET_PATH
from ..maze import Maze
def test_maze_init():
maze = Maze(maze_asset_path=MAZE_ASSET_PATH)
assert maze.maze_height == 100
assert maze.maze_width == 140

View file

@ -159,13 +159,13 @@ def path_finder_v2(a, start, end, collision_block_char) -> list[int]:
return the_path
def path_finder(maze: "Maze", start: list[int], end: list[int], collision_block_char: str) -> list[int]:
def path_finder(collision_maze: list, start: list[int], end: list[int], collision_block_char: str) -> list[int]:
# EMERGENCY PATCH
start = (start[1], start[0])
end = (end[1], end[0])
# END EMERGENCY PATCH
path = path_finder_v2(maze, start, end, collision_block_char)
path = path_finder_v2(collision_maze, start, end, collision_block_char)
new_path = []
for i in path:

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -17,8 +17,8 @@ from metagpt.schema import Message
class Environment(BaseModel):
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到
Environment, hosting a batch of roles, roles can publish messages to the environment, and can be observed by other roles
Environment, hosting a batch of roles, roles can publish messages to the environment,
and can be observed by other roles
"""
roles: dict[str, Role] = Field(default_factory=dict)

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from typing import Callable
from pydantic import Field
from metagpt.environment.environment import Environment
class GeneralEnvironment(Environment):
"""
A GeneralEnvironment for interfacing with games, etc. It create a registration mechanism to register
custom methods when operating with the particular environment.
"""
name: str = Field(default="")
registered_funcs: dict[str, Callable] = Field(default={})
def register_func(self, func_name: str, func: Callable):
if func_name not in self.registered_funcs:
self.registered_funcs[func_name] = func
def call_func(self, func_name: str, *args, **kwargs):
assert func_name in self.registered_funcs
func = self.registered_funcs.get(func_name)
return func(*args, **kwargs)
@staticmethod
def init_register_funcs(self):
raise NotImplementedError()

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : RL environment about Gymnasium(forked from openai gym)
from typing import Callable
import gymnasium as gym
from metagpt.logs import logger
from metagpt.environment.general_environment import GeneralEnvironment
class GymEnvironment(GeneralEnvironment):
def init_register_funcs(self):
env = gym.make(self.name)
logger.info(f"init gym environment: {self.name}")
self.register_func("reset", env.reset)
self.register_func("sample_action", env.action_space.sample)
self.register_func("step", env.step)
self.register_func("close", env.close)

View file

@ -11,7 +11,6 @@ from typing import Iterable, Type
from pydantic import BaseModel, Field
# from metagpt.environment import Environment
from metagpt.config import CONFIG
from metagpt.actions import Action, ActionOutput
from metagpt.llm import LLM

View file

@ -9,7 +9,7 @@ from pydantic import BaseModel, Field
from metagpt.actions import BossRequirement
from metagpt.config import CONFIG
from metagpt.environment import Environment
from metagpt.environment.environment import Environment
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : unittest of gym environment
from metagpt.environment.gym_environment import GymEnvironment
def test_gym_environment():
gym_env = GymEnvironment(name="CartPole-v1")
gym_env.init_register_funcs()
observation, info = gym_env.call_func("reset", seed=42)
for _ in range(2):
action = gym_env.call_func("sample_action")
observation, reward, terminated, truncated, info = gym_env.call_func("step", action=action)
if terminated or truncated:
observation, info = gym_env.call_func("reset")
assert len(observation) == 4
gym_env.call_func("close")

View file

@ -9,7 +9,7 @@
import pytest
from metagpt.actions import BossRequirement
from metagpt.environment import Environment
from metagpt.environment.environment import Environment
from metagpt.logs import logger
from metagpt.manager import Manager
from metagpt.roles import Architect, ProductManager, Role