merge: merge from yuymf/minecraft_dev2, update base vdb op in PlayerActions and add event execute func, use PlayerActions as action base

This commit is contained in:
stellahsr 2023-10-06 20:28:01 +08:00
parent 5aea00f8b4
commit 6bcd3bdcee
10 changed files with 145 additions and 131 deletions

View file

@ -10,13 +10,11 @@ from langchain.vectorstores import Chroma
from metagpt.document_store import FaissStore
from metagpt.logs import logger
from metagpt.actions import Action
from metagpt.actions.minecraft.player_action import PlayerActions as Action
from metagpt.utils.minecraft import load_prompt, fix_and_parse_json
from metagpt.schema import HumanMessage, SystemMessage
from metagpt.const import CKPT_DIR
# from metagpt.actions.minecraft import PlayerActions
class DesignTask(Action):
"""
@ -63,11 +61,12 @@ class DesignTask(Action):
response = self.parse_llm_response(
curriculum
) # Task: Craft 4 wooden planks.
logger.info(f"Parsed Curriculum Agent response\n{response}")
assert "next_task" in response
return response["next_task"]
except Exception as e:
logger.info(f"Error parsing curriculum response: {e}. Trying again!")
return self.generate_task(
return await self.generate_task(
human_msg=human_msg,
system_msg=system_msg,
max_retries=max_retries - 1,
@ -92,29 +91,6 @@ class DesignCurriculum(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
# voyager vectordb using
self.qa_cache = {}
self.qa_cache_questions_vectordb = Chroma(
collection_name="qa_cache_questions_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/curriculum/vectordb",
)
# TODO: change to FaissStore
# self.qa_cache_questions_vectordb = FaissStore( {CKPT_DIR}/ 'curriculum/vectordb')
# TODO:
# assert self.qa_cache_questions_vectordb._collection.count() == len(
# self.qa_cache
# ), (
# f"Curriculum Agent's qa cache question vectordb is not synced with qa_cache.json.\n"
# f"There are {self.qa_cache_questions_vectordb._collection.count()} questions in vectordb "
# f"but {len(self.qa_cache)} questions in qa_cache.json.\n"
# f"Did you set resume=False when initializing the agent?\n"
# f"You may need to manually delete the qa cache question vectordb directory for running from scratch.\n"
# )
@classmethod
def set_qa_cache(cls, qa_cache):
cls.qa_cache = qa_cache
# Check if qa_cache right using
@classmethod
def generate_qa(cls, events, chest_observation):

View file

@ -5,11 +5,8 @@
import os
import json
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from metagpt.document_store import FaissStore
from metagpt.logs import logger
from metagpt.actions import Action
from metagpt.actions.minecraft.player_action import PlayerActions as Action
from metagpt.const import CKPT_DIR
@ -21,21 +18,6 @@ class RetrieveSkills(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
# TODO: mv to PlayerAction
self.retrieval_top_k = 5
self.vectordb = Chroma(
collection_name="skill_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/skill/vectordb",
)
# Check if skills right using
# TODO:
# assert self.vectordb._collection.count() == len(self.skills), (
# f"Skill Manager's vectordb is not synced with skills.json.\n"
# f"There are {self.vectordb._collection.count()} skills in vectordb but {len(self.skills)} skills in skills.json.\n"
# f"Did you set resume=False when initializing the manager?\n"
# f"You may need to manually delete the vectordb directory for running from scratch."
# )
async def run(self, query, skills, *args, **kwargs):
# Implement the logic for retrieving skills here.
@ -62,22 +44,6 @@ class AddNewSkills(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
# TODO: mv to PlayerAction
self.vectordb = Chroma(
collection_name="skill_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/skill/vectordb",
)
# TODO: change to FaissStore
# self.qa_cache_questions_vectordb = FaissStore( {CKPT_DIR}/ 'skill/vectordb')
# TODO:
# Check if skills right using
# assert self.vectordb._collection.count() == len(self.skills), (
# f"Skill Manager's vectordb is not synced with skills.json.\n"
# f"There are {self.vectordb._collection.count()} skills in vectordb but {len(self.skills)} skills in skills.json.\n"
# f"Did you set resume=False when initializing the manager?\n"
# f"You may need to manually delete the vectordb directory for running from scratch."
# )
async def run(
self, task, program_name, program_code, skills, skill_desp, *args, **kwargs

View file

@ -3,8 +3,57 @@
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
from metagpt.actions import Action
from langchain.vectorstores import Chroma
from langchain.embeddings.openai import OpenAIEmbeddings
from metagpt.document_store import FaissStore
from metagpt.const import CKPT_DIR
class PlayerActions(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
self.skills = {}
self.qa_cache = {}
self.retrieval_top_k = 5
self.vectordb = Chroma(
collection_name="skill_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/skill/vectordb",
)
self.qa_cache_questions_vectordb = Chroma(
collection_name="qa_cache_questions_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/curriculum/vectordb",
)
# TODO: change to FaissStore
# self.qa_cache_questions_vectordb = FaissStore( {CKPT_DIR}/ 'curriculum/vectordb'
@classmethod
def set_skills(cls, skills):
cls.skills = skills
# Check if Skill Manager's vectordb right using
assert cls.vectordb._collection.count() == len(cls.skills), (
f"Skill Manager's vectordb is not synced with skills.json.\n"
f"There are {cls.vectordb._collection.count()} skills in vectordb but {len(cls.skills)} skills in skills.json.\n"
f"Did you set resume=False when initializing the manager?\n"
f"You may need to manually delete the vectordb directory for running from scratch."
)
@classmethod
def set_qa_cache(cls, qa_cache):
cls.qa_cache = qa_cache
# Check if qa_cache right using
# Check if Skill Manager's vectordb right using
assert cls.qa_cache_questions_vectordb._collection.count() == len(
cls.qa_cache
), (
f"Curriculum Agent's qa cache question vectordb is not synced with qa_cache.json.\n"
f"There are {cls.qa_cache_questions_vectordb._collection.count()} questions in vectordb "
f"but {len(cls.qa_cache)} questions in qa_cache.json.\n"
f"Did you set resume=False when initializing the agent?\n"
f"You may need to manually delete the qa cache question vectordb directory for running from scratch.\n"
)
"""Minecraft player info without any implementation details"""
async def run(self, *args, **kwargs):
raise NotImplementedError

View file

@ -15,7 +15,6 @@ class VerifyTask(Action):
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
self.vect_db = ""
async def run(self,human_msg, system_msg, max_retries=5, *args, **kwargs):
# Implement the logic to verify the task here.
@ -29,7 +28,8 @@ class VerifyTask(Action):
logger.info(f"Failed to parse Critic Agent response. Consider updating your prompt.")
return False, ""
if human_msg or system_msg is None:
if human_msg is None:
logger.warning(f"Failed to get human_msg or system_msg.")
return False, ""
critic = await self._aask(prompt=human_msg, system_msgs=system_msg)
try:

View file

@ -185,13 +185,13 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
)
mine_pattern = r"I need at least a (.*) to mine \w+!"
if re.match(craft_pattern, message):
return re.match(craft_pattern, message).groups()[0]
self.event_summary = re.match(craft_pattern, message).groups()[0]
elif re.match(craft_pattern2, message):
return "a nearby crafting table"
self.event_summary = "a nearby crafting table"
elif re.match(mine_pattern, message):
return re.match(mine_pattern, message).groups()[0]
self.event_summary = re.match(mine_pattern, message).groups()[0]
else:
return ""
self.event_summary = ""
chatlog = set()
for event_type, event in events:
@ -199,8 +199,7 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
item = filter_item(event["onChat"])
if item:
chatlog.add(item)
return "I also need " + ", ".join(chatlog) + "." if chatlog else ""
self.event_summary = "I also need " + ", ".join(chatlog) + "." if chatlog else ""
def reset_block_info(self):
# revert all the placing event in the last step
pass
@ -214,9 +213,6 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
"conversations": self.conversations,
}
"""
# update runtime status in game memory
self.runtime_status = success
task = self.current_task
if task.startswith("Deposit useless items into the chest at"):
return
@ -226,23 +222,22 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
else:
logger.info(f"Failed to complete task {task}. Skipping to next task.")
self.failed_tasks.append(task)
# TODO: when not success, transform code below to update event!(isolate step soon!)
# if self.reset_placed_if_failed and not success:
# # revert all the placing event in the last step
# blocks = []
# positions = []
# for event_type, event in events:
# if event_type == "onSave" and event["onSave"].endswith("_placed"):
# block = event["onSave"].split("_placed")[0]
# position = event["status"]["position"]
# blocks.append(block)
# positions.append(position)
# new_events = self.env.step(
# f"await givePlacedItemBack(bot, {U.json_dumps(blocks)}, {U.json_dumps(positions)})",
# programs=self.skill_manager.programs,
# )
# events[-1][1]["inventory"] = new_events[-1][1]["inventory"]
# events[-1][1]["voxels"] = new_events[-1][1]["voxels"]
# when not success, below to update event!
# revert all the placing event in the last step
blocks = []
positions = []
for event_type, event in self.event:
if event_type == "onSave" and event["onSave"].endswith("_placed"):
block = event["onSave"].split("_placed")[0]
position = event["status"]["position"]
blocks.append(block)
positions.append(position)
new_events = self.mf_instance.step(
f"await givePlacedItemBack(bot, {json.dumps(blocks)}, {json.dumps(positions)})",
programs=self.programs,
)
self.event[-1][1]["inventory"] = new_events[-1][1]["inventory"]
self.event[-1][1]["voxels"] = new_events[-1][1]["voxels"]
self.save_sorted_tasks()
@ -269,10 +264,40 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
with open(f"{CKPT_DIR}/curriculum/failed_tasks.json", "w") as f:
json.dump(self.failed_tasks, f)
async def on_event(self, *args):
async def on_event_retrieve(self, *args):
"""
Retrieve Minecraft events.
Returns:
list: A list of Minecraft events.
Raises:
Exception: If there is an issue retrieving events.
"""
try:
self.mf_instance.reset(
options={
"mode": "soft",
"wait_ticks": 20,
}
)
difficulty = (
"easy" if len(self.completed_tasks) > 15 else "peaceful"
)
events = self.mf_instance.step(
"bot.chat(`/time set ${getNextTime()}`);\n"
+ f"bot.chat('/difficulty {difficulty}');"
)
self.update_event(events)
return events
except Exception as e:
logger.error(f"Failed to retrieve Minecraft events: {str(e)}")
raise {}
async def on_event_execute(self, *args):
"""
Execute Minecraft events.
This function is used to obtain events from the Minecraft environment. Check the implementation in
the 'voyager/env/bridge.py step()' function to capture events generated within the game.
@ -283,40 +308,16 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
Exception: If there is an issue retrieving events.
"""
try:
if not self.mf_instance.has_reset:
# TODO Modify
logger.info("Environment has not been reset yet, is resetting")
self.mf_instance.reset(
options={
"mode": "soft",
"wait_ticks": 20,
}
)
# raise {}
self.mf_instance.check_process()
self.mf_instance.unpause()
data = {
"code": self.code,
"programs": self.programs,
}
res = requests.post(
f"{self.mf_instance.server}/step",
json=data,
timeout=self.mf_instance.request_timeout,
events = self.mf_instance.step(
code = self.code,
programs=self.programs,
)
if res.status_code != 200:
logger.error("Failed to step Minecraft server")
raise {}
returned_data = res.json()
self.mf_instance.pause()
events = json.loads(returned_data)
logger.info(f"Get Current Event: {events}")
self.update_event(events)
return events
except Exception as e:
logger.error(f"Failed to retrieve Minecraft events: {str(e)}")
logger.error(f"Failed to execute Minecraft events: {str(e)}")
raise {}
class MinecraftPlayer(SoftwareCompany):
"""
Software Company: Possesses a team, SOP (Standard Operating Procedures), and a platform for instant messaging,

View file

@ -136,6 +136,24 @@ class MineflayerEnv:
self.pause()
return json.loads(returned_data)
def step(self, code: str, programs: str = ""):
if not self.has_reset:
raise RuntimeError("Environment has not been reset yet")
self.check_process()
self.unpause()
data = {
"code": code,
"programs": programs,
}
res = requests.post(
f"{self.server}/step", json=data, timeout=self.request_timeout
)
if res.status_code != 200:
raise RuntimeError("Failed to step Minecraft server")
returned_data = res.json()
self.pause()
return json.loads(returned_data)
def close(self):
self.unpause()
if self.connected:

View file

@ -135,7 +135,9 @@ class CriticReviewer(Base):
self.perform_game_info_callback(
success, self.game_memory.update_exploration_progress
)
self.perform_game_info_callback(critique, self.game_memory.update_critique)
self.perform_game_info_callback(
critique, self.game_memory.update_critique
)
return Message(
content=f"{critique}",
instruct_content="verify_task",
@ -153,10 +155,9 @@ class CriticReviewer(Base):
self.maintain_actions(todo)
# 获取最新的游戏周边信息
events = await self._obtain_events()
self.perform_game_info_callback(
events, self.game_memory.update_event
) # update chest_memory / chest observation
events = await self._execute_events()
self.perform_game_info_callback(events, self.game_memory.update_chest_memory)
logger.info(f"Execute return event is {self.game_memory.event}")
context = self.game_memory.context
task = self.game_memory.current_task
chest_observation = self.game_memory.chest_observation

View file

@ -314,10 +314,9 @@ class CurriculumDesigner(Base):
self.maintain_actions(todo)
# 获取最新的游戏周边环境信息
events = await self._obtain_events()
self.perform_game_info_callback(events, self.game_memory.update_event)
chest_observation = self.game_memory.chest_observation
DesignCurriculum.set_qa_cache(self.game_memory.qa_cache)
# DesignCurriculum.set_qa_cache(self.game_memory.qa_cache)
# msg = self._rc.memory.get(k=1)[0]
# query = msg.content
@ -335,7 +334,7 @@ class CurriculumDesigner(Base):
}
handler = handler_map.get(type(todo))
if handler:
if type(todo) == "DesignTask":
if type(todo) == DesignTask:
msg = await handler(**design_task_message)
else:
msg = await handler(**design_curriculum_message)

View file

@ -103,7 +103,10 @@ class Minecraft(Role):
self._rc.todo = None
async def _obtain_events(self):
return await self.game_memory.on_event()
return await self.game_memory.on_event_retrieve()
async def _execute_events(self):
return await self.game_memory.on_event_execute()
def set_memory(self, shared_memory: 'GameEnviroment'):
self.game_memory = shared_memory
@ -116,7 +119,7 @@ class Minecraft(Role):
@staticmethod
def perform_game_info_callback(info: object, callback: object) -> object:
logger.info(info)
# logger.info(info)
callback(info)
def encapsule_message(self, msg, *args, **kwargs):
@ -130,5 +133,5 @@ agent_registry = Registry(name="Minecraft")
if __name__ == "__main__":
mc = Minecraft()
result = "Async operation result"
# 调用回调函数,并传递结果
# 调用回调函数,并传递结果
# mc.perform_memory_callback(mc.my_callback)

View file

@ -85,6 +85,7 @@ class SkillManager(Base):
task = self.game_memory.current_task
event_summary = self.game_memory.event_summary
code = self.game_memory.code
self.perform_game_info_callback(self.game_memory.event, self.game_memory.summarize_chatlog)
try:
program_code = code["program_code"] # TODO: Handle code is None, cuz first round DesignCurriculum(code is None) trigger this
except (KeyError, TypeError):