Merge pull request #379 from yuymf/minecraft_dev2

Minecraft game add curriculum_agent
This commit is contained in:
Sirui Hong 2023-09-30 20:21:05 +08:00 committed by GitHub
commit 8fc469bdc4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 751 additions and 126 deletions

View file

@ -2,8 +2,20 @@
# @Date : 2023/9/23 14:56
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import json
import re
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from metagpt.document_store import FaissStore
from metagpt.logs import logger
from metagpt.actions import Action
from metagpt.utils.minecraft import load_prompt, fix_and_parse_json
from metagpt.schema import HumanMessage, SystemMessage
from metagpt.const import CKPT_DIR
# from metagpt.actions.minecraft import PlayerActions
class DesignTask(Action):
@ -11,39 +23,64 @@ class DesignTask(Action):
Action class for decomposing a task.
Refer to the code in the voyager/agents/curriculum.py for implementation details.
"""
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
def decompose_task(self, query):
# Implement the logic to decompose a task here.
return ""
async def propose_next_ai_task(self, prompts, system_msg):
async def decompose_task(self, query, events):
system_msgs = SystemMessage(
content=load_prompt("curriculum_task_decomposition")
)
prompt = self.render_human_message(
events=events, chest_observation=""
) + HumanMessage(content=f"Final task: {query}")
logger.info(f"Curriculum Agent task decomposition\nFinal task: {query}")
rsp = await self._aask(prompt=prompt, system_msgs=system_msgs)
logger.info(f"Curriculum Agent task decomposition\n{rsp}")
return fix_and_parse_json(rsp)
def parse_llm_response(self, llm_resp):
task = ""
for line in llm_resp.split("\n"):
if line.startswith("Task:"):
task = line[5:].replace(".", "").strip()
assert task, "Task not found in Curriculum Agent response"
return {"next_task": task}
async def generate_task(self, human_msg, system_msg, max_retries=5):
"""
Refer to the code in the voyager/agents/curriculum.py propose_next_ai_task() for implementation details.
Returns:
Returns: task & context
"""
curriculum = await self._aask(prompt=prompts, system_msgs=system_msg)
logger.info(f"\033[31m****Curriculum Agent ai message****\n{curriculum}\033[0m")
def parse_llm_response(self, llm_resp):
# Implement the logic to parse the LLM response here.
return "", ""
if max_retries == 0:
raise RuntimeError("Max retries reached, failed to propose task.")
curriculum = await self._aask(prompt=human_msg, system_msgs=system_msg)
logger.info(f"Curriculum Agent message\n{curriculum}")
try:
response = self.parse_llm_response(
curriculum
) # Task: Craft 4 wooden planks.
assert "next_task" in response
return response["next_task"]
except Exception as e:
logger.info(f"Error parsing curriculum response: {e}. Trying again!")
return self.generate_task(
human_msg=human_msg,
system_msg=system_msg,
max_retries=max_retries - 1,
)
async def run(self, human_msg, system_msg, *args, **kwargs):
logger.info(f"run {self.__repr__()}")
# Call the language model to generate a response.
llm_response = await self.propose_next_ai_task(prompts=human_msg, system_msg=system_msg)
# Parse the response from the language model.
task, context = self.parse_llm_response(llm_response)
return task, context
task = await self.generate_task(human_msg=human_msg, system_msg=system_msg)
return task
class DesignCurriculum(Action):
@ -51,34 +88,160 @@ class DesignCurriculum(Action):
Action class for designing curriculum-related questions.
Refer to the code in the voyager/agents/curriculum.py for implementation details.
"""
def __init__(self, name="", context=None, llm=None):
super().__init__(name, context, llm)
self.vect_db = ""
def get_task_context(self):
# Implement the logic for a specific task in generating context.
return ""
def generate_qa(self):
# Implement the logic to generate curriculum-related questions and answers.
question = ""
answer = ""
# voyager vectordb using
self.qa_cache = {}
self.qa_cache_questions_vectordb = Chroma(
collection_name="qa_cache_questions_vectordb",
embedding_function=OpenAIEmbeddings(),
persist_directory=f"{CKPT_DIR}/curriculum/vectordb",
)
# TODO: change to FaissStore
# self.qa_cache_questions_vectordb = FaissStore( {CKPT_DIR}/ 'curriculum/vectordb')
# Check if qa_cache right using
assert self.qa_cache_questions_vectordb._collection.count() == len(
self.qa_cache
), (
f"Curriculum Agent's qa cache question vectordb is not synced with qa_cache.json.\n"
f"There are {self.qa_cache_questions_vectordb._collection.count()} questions in vectordb "
f"but {len(self.qa_cache)} questions in qa_cache.json.\n"
f"Did you set resume=False when initializing the agent?\n"
f"You may need to manually delete the qa cache question vectordb directory for running from scratch.\n"
)
@classmethod
def set_qa_cache(cls, qa_cache):
cls.qa_cache = qa_cache
@classmethod
def generate_qa(cls, events, chest_observation):
"""
Generate qa for DesignTask's HumanMessage
"""
questions_new, _ = cls.generate_qa_step1(
events=events, chest_observation=chest_observation
)
questions = []
answers = []
for question in questions_new:
if cls.qa_cache_questions_vectordb._collection.count() > 0:
docs_and_scores = (
cls.qa_cache_questions_vectordb.similarity_search_with_score(
question, k=1
)
)
if docs_and_scores and docs_and_scores[0][1] < 0.05:
question_cached = docs_and_scores[0][0].page_content
assert question_cached in cls.qa_cache
answer_cached = cls.qa_cache[question_cached]
questions.append(question_cached)
answers.append(answer_cached)
continue
answer = cls.generate_qa_step2(question=question)
assert question not in cls.qa_cache
cls.qa_cache[question] = answer
cls.qa_cache_questions_vectordb.add_texts(
texts=[question],
)
with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "w") as f:
json.dump(cls.qa_cache, f)
cls.qa_cache_questions_vectordb.persist()
questions.append(question)
answers.append(answer)
assert len(questions_new) == len(questions) == len(answers)
return questions, answers
async def generate_qa_step1(self, events, human_msg, system_msg):
biome = events[-1][1]["status"]["biome"].replace("_", " ")
questions = [
f"What are the blocks that I can find in the {biome} in Minecraft?",
f"What are the items that I can find in the {biome} in Minecraft?",
f"What are the mobs that I can find in the {biome} in Minecraft?",
]
qa_response = await self._aask(prompt=human_msg, system_msgs=system_msg)
try:
# Regex pattern to extract question and concept pairs
pattern = r"Question \d+: (.+)\nConcept \d+: (.+)"
# Extracting all question and concept pairs from the text
pairs = re.findall(pattern, qa_response)
# Storing each question and concept in separate lists
questions_new = [pair[0] for pair in pairs]
questions.extend(questions_new)
except Exception as e:
logger.error(
f"Error parsing curriculum response for "
f"QA step 1 ask questions: {e}."
)
return questions
async def generate_qa_step2(self, question):
# Implement the logic for another specific step in generating questions and answers.
logger.info(f"Curriculum Agent Question: {question}")
human_msg = HumanMessage(content=f"Question: {question}").content
system_msg = [
SystemMessage(
content=load_prompt("curriculum_qa_step2_answer_questions")
).content
]
answer = await self._aask(prompt=human_msg, system_msgs=system_msg)
logger.info(f"Curriculum Agent {answer}")
return answer
async def get_context_from_task(self, task):
"""
Args: task
Returns: context: "Question: {question}\n{answer}"
if include ore in question, gpt will try to use tool with skill touch enhancement to mine
"""
question = (
f"How to {task.replace('_', ' ').replace(' ore', '').replace(' ores', '').replace('.', '').strip().lower()}"
f" in Minecraft?"
)
if question in self.qa_cache:
answer = self.qa_cache[question]
else:
answer = await self.generate_qa_step2(question=question)
self.qa_cache[question] = answer
self.qa_cache_questions_vectordb.add_texts(
texts=[question],
)
with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "w") as f:
json.dump(self.qa_cache, f)
self.qa_cache_questions_vectordb.persist()
context = f"Question: {question}\n{answer}"
return context
def generate_qa_step1(self):
# Implement the logic for a specific step in generating questions and answers.
return ""
def generate_qa_step2(self):
# Implement the logic for another specific step in generating questions and answers.
return ""
async def run(self, *args, **kwargs):
async def generate_context(self, task, max_retries=5):
"""
Refer to the code in the voyager/agents/curriculum.py propose_next_ai_task() for implementation details.
Returns: context
"""
if max_retries == 0:
raise RuntimeError("Max retries reached, failed to propose context.")
try:
context = await self.get_context_from_task(
task=task
) # Curriculum Agent Question: How to craft 4 wooden planks in Minecraft? & Curriculum Agent Answer: ...
return context
except Exception as e:
logger.info(f"Error parsing curriculum response: {e}. Trying again!")
return self.generate_context(
task=task,
max_retries=max_retries - 1,
)
async def run(self, task, human_msg, system_msg, *args, **kwargs):
logger.info(f"run {self.__repr__()}")
# Generate curriculum-related questions and answers.
curriculum_qa = self.generate_qa()
# curriculum_qustion = await self.generate_qa_step1(events, human_msg, system_msg)
curriculum_context = await self.generate_context(task)
# Return the generated questions and answers.
return curriculum_qa
return curriculum_context

View file

@ -40,3 +40,46 @@ TUTORIAL_PATH = DATA_PATH / "tutorial_docx"
SKILL_DIRECTORY = PROJECT_ROOT / "metagpt/skills"
MEM_TTL = 24 * 30 * 3600
### MineCraft ###
CKPT_DIR = PROJECT_ROOT / "metagpt/ckpt"
LOG_DIR = PROJECT_ROOT / "logs"
DEFAULT_WARMUP = {
"context": 15,
"biome": 10,
"time": 15,
"nearby_blocks": 0,
"other_blocks": 10,
"nearby_entities": 5,
"health": 15,
"hunger": 15,
"position": 0,
"equipment": 0,
"inventory": 0,
"optional_inventory_items": 7,
"chests": 0,
"completed_tasks": 0,
"failed_tasks": 0,
}
CURRICULUM_OB = [
"context",
"biome",
"time",
"nearby_blocks",
"other_blocks",
"nearby_entities",
"health",
"hunger",
"position",
"equipment",
"inventory",
"chests",
"completed_tasks",
"failed_tasks",
]
CORE_INVENTORY_ITEMS = r".*_log|.*_planks|stick|crafting_table|furnace"
r"|cobblestone|dirt|coal|.*_pickaxe|.*_sword|.*_axe", # curriculum_agent: only show these items in inventory before optional_inventory_items reached in warm up

View file

@ -16,6 +16,7 @@ from metagpt.actions.minecraft.player_action import PlayerActions
from metagpt.roles.minecraft.minecraft_base import Minecraft
from metagpt.environment import Environment
from metagpt.mineflayer_environment import MineflayerEnv
from metagpt.const import CKPT_DIR
class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
@ -24,30 +25,57 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
"""
event: dict[str, Any] = Field(default_factory=dict)
current_task: str = Field(default="Craft 4 wooden planks")
current_task: str = Field(default="Mine 1 wood log")
task_execution_time: float = Field(default=float)
context: str = Field(default="")
context: str = Field(
default="You can mine one of oak, birch, spruce, jungle, acacia, dark oak, or mangrove logs."
)
code: str = Field(default=None)
programs: str = Field(default="")
critique: str = Field(default="")
critique: str = Field(default=None)
skills: list[str] = Field(default_factory=list)
question: str = Field(default=None)
chest_memory: dict[str, Any] = Field(default_factory=dict)
qa_cache: dict[str, str] = Field(default_factory=dict)
completed_tasks: list[str] = Field(default_factory=list) # Critique things
failed_tasks: list[str] = Field(default_factory=list)
chest_memory: dict[str, Any] = Field(
default_factory=dict
) # eg: {'(1344, 64, 1381)': 'Unknown'}
chest_observation: str = Field(default="") # eg: "Chests: None\n\n"
mf_instance: MineflayerEnv = Field(default_factory=MineflayerEnv)
@property
def progress(self):
# return len(self.completed_tasks) + 10 # Test only
return len(self.completed_tasks)
@property
def warm_up(self):
return self.mf_instance.warm_up
@property
def core_inv_items_regex(self):
return self.mf_instance.core_inv_items_regex
def set_mc_port(self, mc_port):
self.mf_instance.set_mc_port(mc_port)
def set_mc_resume(self, resume: bool = False):
def set_mc_resume(self, resume: bool = False): # TODO: mv to config
if resume:
logger.info(
f"Loading Action Developer from {self.mf_instance.ckpt_dir}/action"
)
with open(
f"{self.mf_instance.ckpt_dir}/action/chest_memory.json", "r"
) as f:
logger.info(f"Loading Action Developer from {CKPT_DIR}/action")
with open(f"{CKPT_DIR}/action/chest_memory.json", "r") as f:
self.chest_memory = json.load(f)
logger.info(f"Loading Curriculum Agent from {CKPT_DIR}/curriculum")
with open(f"{CKPT_DIR}/curriculum/completed_tasks.json", "r") as f:
self.completed_tasks = json.load(f)
with open(f"{CKPT_DIR}/curriculum/failed_tasks.json", "r") as f:
self.failed_tasks = json.load(f)
with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "r") as f:
self.qa_cache = json.load(f)
# TODO: add skills resume
def register_roles(self, roles: Iterable[Minecraft]):
@ -57,6 +85,7 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
def update_event(self, event: Dict):
self.event = event
self.update_chest_memory(event)
self.update_chest_observation()
def update_task(self, task: str):
self.current_task = task
@ -93,9 +122,33 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True):
if chest != "Invalid":
logger.info(f"Action Developer saving chest {position}: {chest}")
self.chest_memory[position] = chest
with open(f"{self.mf_instance.ckpt_dir}/action/chest_memory.json", "w") as f:
with open(f"{CKPT_DIR}/action/chest_memory.json", "w") as f:
json.dump(self.chest_memory, f)
def update_chest_observation(self):
"""
update chest_memory to chest_observation.
Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/action.py
"""
chests = []
for chest_position, chest in self.chest_memory.items():
if isinstance(chest, dict) and len(chest) > 0:
chests.append(f"{chest_position}: {chest}")
for chest_position, chest in self.chest_memory.items():
if isinstance(chest, dict) and len(chest) == 0:
chests.append(f"{chest_position}: Empty")
for chest_position, chest in self.chest_memory.items():
if isinstance(chest, str):
assert chest == "Unknown"
chests.append(f"{chest_position}: Unknown items inside")
assert len(chests) == len(self.chest_memory)
if chests:
chests = "\n".join(chests)
self.chest_observation = f"Chests:\n{chests}\n\n"
else:
self.chest_observation = f"Chests: None\n\n"
async def on_event(self, *args):
"""
Retrieve Minecraft events.

View file

@ -6,11 +6,12 @@ import os
import time
import json
import requests
import re
from metagpt.logs import logger
import metagpt.utils.minecraft as U
from metagpt.utils.minecraft.process_monitor import SubprocessMonitor
from metagpt.const import CKPT_DIR, DEFAULT_WARMUP, CURRICULUM_OB, CORE_INVENTORY_ITEMS
class MineflayerEnv:
def __init__(
@ -29,9 +30,33 @@ class MineflayerEnv:
self.reset_options = None
self.connected = False
self.server_paused = False
self.ckpt_dir = "metagpt/ckpt"
os.makedirs(f"{self.ckpt_dir}/action", exist_ok=True)
self.warm_up = {} # turns that when to add part of curriculum_ob to HumanMessage TODO: MV
self.core_inv_items_regex = None
self._set_warmup()
os.makedirs(f"{CKPT_DIR}/curriculum/vectordb", exist_ok=True)
os.makedirs(f"{CKPT_DIR}/action", exist_ok=True)
def _set_warmup(self):
warm_up = DEFAULT_WARMUP
if "optional_inventory_items" in warm_up:
assert CORE_INVENTORY_ITEMS is not None
self.core_inv_items_regex = re.compile(
CORE_INVENTORY_ITEMS
)
self.warm_up["optional_inventory_items"] = warm_up[
"optional_inventory_items"
]
else:
self.warm_up["optional_inventory_items"] = 0
for key in CURRICULUM_OB:
self.warm_up[key] = warm_up.get(key, DEFAULT_WARMUP[key])
self.warm_up["nearby_blocks"] = 0
self.warm_up["inventory"] = 0
self.warm_up["completed_tasks"] = 0
self.warm_up["failed_tasks"] = 0
def set_mc_port(self, mc_port):
self.mc_port = mc_port

View file

@ -41,30 +41,6 @@ class ActionDeveloper(Base):
# 需要根据events进行自己chest_observation的更新
self._watch([RetrieveSkills])
def render_chest_observation(self):
"""
Render game_memory.chest_memory to prompt text.
Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/action.py
"""
chests = []
for chest_position, chest in self.game_memory.chest_memory.items():
if isinstance(chest, dict) and len(chest) > 0:
chests.append(f"{chest_position}: {chest}")
for chest_position, chest in self.game_memory.chest_memory.items():
if isinstance(chest, dict) and len(chest) == 0:
chests.append(f"{chest_position}: Empty")
for chest_position, chest in self.game_memory.chest_memory.items():
if isinstance(chest, str):
assert chest == "Unknown"
chests.append(f"{chest_position}: Unknown items inside")
assert len(chests) == len(self.game_memory.chest_memory)
if chests:
chests = "\n".join(chests)
return f"Chests:\n{chests}\n\n"
else:
return f"Chests: None\n\n"
def render_system_message(self, skills=[], *args, **kwargs):
"""
According to basic skills context files to genenarate js skill codes.
@ -163,12 +139,11 @@ class ActionDeveloper(Base):
observation += f"Equipment: {equipment}\n\n"
observation += f"Inventory ({inventory_used}/36): {'Empty' if not inventory else ', '.join(inventory)}\n\n"
# TODO: if task update, uncomment this
# if not (
# task == "Place and deposit useless items into a chest"
# or task.startswith("Deposit useless items into the chest at")
# ):
observation += self.render_chest_observation()
if not (
task == "Place and deposit useless items into a chest"
or task.startswith("Deposit useless items into the chest at")
):
observation += self.game_memory.chest_observation
observation += f"Task: {task}\n\n"
observation += f"Context: {context or 'None'}\n\n"

View file

@ -2,38 +2,249 @@
# @Date : 2023/9/23 12:45
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import random
import json
from metagpt.logs import logger
from metagpt.schema import Message, HumanMessage, SystemMessage
from metagpt.roles.minecraft.minecraft_base import Minecraft as Base
from metagpt.actions.minecraft.design_curriculumn import DesignCurriculum, DesignTask
from metagpt.actions.minecraft.player_action import PlayerActions
from metagpt.utils.minecraft import load_prompt
from metagpt.const import CKPT_DIR, CURRICULUM_OB
class CurriculumDesigner(Base):
"""
CurriculumDesigner is the automatic curriculum in paper, refer to the code voyager/agents/curriculum.py
"""
def __init__(
self,
name: str = "David",
profile: str = "Expertise in minecraft task design and curriculum development.",
goal: str = " Collect and integrate learner feedback to improve and refine educational content and pathways",
constraints: str = "Limited budget and resources for the development of educational content and technology tools."
self,
name: str = "David",
profile: str = "Expertise in minecraft task design and curriculum development.",
goal: str = " Collect and integrate learner feedback to improve and refine educational content and pathways",
constraints: str = "Limited budget and resources for the development of educational content and technology tools.",
) -> None:
super().__init__(name, profile, goal, constraints)
# Initialize actions specific to the Action role
self._init_actions([DesignTask, DesignCurriculum])
# Set events or actions the ActionAgent should watch or be aware of
self._watch([PlayerActions, DesignTask])
def render_human_message(self, msg, *args, **kwargs):
return HumanMessage(content=msg)
def render_system_message(self, msg, *args, **kwargs):
return SystemMessage(content=msg)
def render_curriculum_observation(self, *, events, chest_observation):
"""
Returns: observation for curriculum
Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/curriculum.py
"""
assert events[-1][0] == "observe", "Last event must be observe"
event = events[-1][1]
biome = event["status"]["biome"]
time_of_day = event["status"]["timeOfDay"]
voxels = event["voxels"]
block_records = event["blockRecords"]
entities = event["status"]["entities"]
health = event["status"]["health"]
hunger = event["status"]["food"]
position = event["status"]["position"]
equipment = event["status"]["equipment"]
inventory_used = event["status"]["inventoryUsed"]
inventory = event["inventory"]
if not any(
"dirt" in block
or "log" in block
or "grass" in block
or "sand" in block
or "snow" in block
for block in voxels
):
biome = "underground"
other_blocks = ", ".join(
list(
set(block_records).difference(set(voxels).union(set(inventory.keys())))
)
)
other_blocks = other_blocks if other_blocks else "None"
nearby_entities = (
", ".join([k for k, v in sorted(entities.items(), key=lambda x: x[1])])
if entities
else "None"
)
completed_tasks = (
", ".join(self.game_memory.completed_tasks)
if self.game_memory.completed_tasks
else "None"
)
failed_tasks = (
", ".join(self.game_memory.failed_tasks)
if self.game_memory.failed_tasks
else "None"
)
# filter out optional inventory items if required
if (
self.game_memory.progress
< self.game_memory.warm_up["optional_inventory_items"]
):
inventory = {
k: v
for k, v in inventory.items()
if self.game_memory.core_inv_items_regex.search(k) is not None
}
observation = {
"context": "",
"biome": f"Biome: {biome}\n\n",
"time": f"Time: {time_of_day}\n\n",
"nearby_blocks": f"Nearby blocks: {', '.join(voxels) if voxels else 'None'}\n\n",
"other_blocks": f"Other blocks that are recently seen: {other_blocks}\n\n",
"nearby_entities": f"Nearby entities: {nearby_entities}\n\n",
"health": f"Health: {health:.1f}/20\n\n",
"hunger": f"Hunger: {hunger:.1f}/20\n\n",
"position": f"Position: x={position['x']:.1f}, y={position['y']:.1f}, z={position['z']:.1f}\n\n",
"equipment": f"Equipment: {equipment}\n\n",
"inventory": f"Inventory ({inventory_used}/36): {inventory if inventory else 'Empty'}\n\n",
"chests": chest_observation,
"completed_tasks": f"Completed tasks so far: {completed_tasks}\n\n",
"failed_tasks": f"Failed tasks that are too hard: {failed_tasks}\n\n",
}
return observation
# --------------------------------Design Task Prepare---------------------------------------
def render_design_task_human_message(
self, events, chest_observation, *args, **kwargs
):
"""
Returns: observation for curriculum
Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/curriculum.py
"""
content = ""
warm_up = self.game_memory.mf_instance.warm_up
observation = self.render_curriculum_observation(
events=events, chest_observation=chest_observation
)
if self.game_memory.progress >= warm_up["context"]:
questions, answers = DesignCurriculum.generate_qa(
events=events, chest_observation=chest_observation
)
i = 1
for question, answer in zip(questions, answers):
if "Answer: Unknown" in answer or "language model" in answer:
continue
observation["context"] += f"Question {i}: {question}\n"
observation["context"] += f"{answer}\n\n"
i += 1
if i > 5:
break
for key in CURRICULUM_OB:
if self.game_memory.progress >= warm_up[key]:
if warm_up[key] != 0:
should_include = random.random() < 0.8
else:
should_include = True
if should_include:
content += observation[key]
logger.info(f"Curriculum Agent human message\n{content}")
return HumanMessage(content=content)
def render_design_task_system_message(self, *args, **kwargs):
return SystemMessage(content=load_prompt("curriculum"))
def encapsule_design_task_message(self, events, chest_observation, *args, **kwargs):
human_msg = self.render_design_task_human_message(
events=events, chest_observation=chest_observation, *args, **kwargs
)
system_msg = self.render_design_task_system_message(*args, **kwargs)
return {"system_msg": [system_msg.content], "human_msg": human_msg.content}
def generate_task_if_inventory_full(self, events, chest_observation):
"""
TODO: Try if this could be done with prompt
Returns: Task When inventory is almost full
"""
if chest_observation != "Chests: None\n\n":
chests = chest_observation[8:-2].split("\n")
for chest in chests:
content = chest.split(":")[1]
if content == " Unknown items inside" or content == " Empty":
position = chest.split(":")[0]
task = f"Deposit useless items into the chest at {position}"
return task
if "chest" in events[-1][1]["inventory"]:
task = "Place a chest"
else:
task = "Craft 1 chest"
return task
# -----------------------------------------------------------------------------------------
# --------------------------------Design Curriculum Prepare--------------------------------
def render_design_curriculum_system_message(self, *args, **kwargs):
return SystemMessage(content=load_prompt("curriculum_qa_step1_ask_questions"))
def render_design_curriculum_human_message(
self, events, chest_observation, *args, **kwargs
):
observation = self.render_curriculum_observation(
events=events, chest_observation=chest_observation
)
content = ""
for key in CURRICULUM_OB:
content += observation[key]
return HumanMessage(content=content)
def encapsule_design_curriculum_message(
self, events, chest_observation, *args, **kwargs
):
human_msg = self.render_design_curriculum_human_message(
events=events, chest_observation=chest_observation, *args, **kwargs
)
system_msg = self.render_design_curriculum_system_message(*args, **kwargs)
return {"system_msg": [system_msg.content], "human_msg": human_msg.content}
def generate_context_if_inventory_full(self, events, chest_observation):
"""
TODO: Try if this could be done with prompt
Returns: Context When inventory is almost full
"""
inventoryUsed = events[-1][1]["status"]["inventoryUsed"]
if chest_observation != "Chests: None\n\n":
chests = chest_observation[8:-2].split("\n")
for chest in chests:
content = chest.split(":")[1]
if content == " Unknown items inside" or content == " Empty":
context = (
f"Your inventory have {inventoryUsed} occupied slots before depositing. "
"After depositing, your inventory should only have 20 occupied slots. "
"You should deposit useless items such as andesite, dirt, cobblestone, etc. "
"Also, you can deposit low-level tools, "
"For example, if you have a stone pickaxe, you can deposit a wooden pickaxe. "
"Make sure the list of useless items are in your inventory "
"(do not list items already in the chest), "
"You can use bot.inventoryUsed() to check how many inventory slots are used."
)
return context
if "chest" in events[-1][1]["inventory"]:
context = (
f"You have a chest in inventory, place it around you. "
f"If chests is not None, or nearby blocks contains chest, this task is success."
)
else:
context = "Craft 1 chest with 8 planks of any kind of wood."
return context
# -----------------------------------------------------------------------------------------
async def handle_task_design(self, human_msg, system_msg, *args, **kwargs):
"""
Args:
@ -44,10 +255,25 @@ class CurriculumDesigner(Base):
Returns:
"""
task = await DesignTask().run(human_msg, system_msg, *args, **kwargs)
events = self.game_memory.event
chest_observation = self.game_memory.chest_observation
inventoryUsed = events[-1][1]["status"]["inventoryUsed"]
if self.game_memory.progress == 0:
task = self.game_memory.current_task
elif inventoryUsed >= 33:
task = self.generate_task_if_inventory_full(
self, events=events, chest_observation=chest_observation
)
else:
task = await DesignTask().run(human_msg, system_msg, *args, **kwargs)
logger.info(f"Handle_task_design result is Here: {task}")
self.perform_game_info_callback(task, self.game_memory.update_task)
return Message(content=f"{task}", instruct_content="task_design", role=self.profile)
return Message(
content=f"{task}", instruct_content="task_design", role=self.profile
)
async def handle_curriculum_design(self, human_msg, system_msg, *args, **kwargs):
"""
refer to the context generation in voyager
@ -60,34 +286,107 @@ class CurriculumDesigner(Base):
Returns:
"""
context = await DesignCurriculum().run(human_msg, system_msg, *args, **kwargs)
events = self.game_memory.event
chest_observation = self.game_memory.chest_observation
inventoryUsed = events[-1][1]["status"]["inventoryUsed"]
task = self.game_memory.current_task
if self.game_memory.progress == 0:
context = self.game_memory.context
elif inventoryUsed >= 33:
context = self.generate_context_if_inventory_full(
self, events=events, chest_observation=chest_observation
)
else:
context = await DesignCurriculum().run(
task, human_msg, system_msg, *args, **kwargs
)
self.perform_game_info_callback(context, self.game_memory.update_context)
return Message(content=f"{context}", instruct_content="curriculum_design", role=self.profile)
return Message(
content=f"{context}",
instruct_content="curriculum_design",
role=self.profile,
)
# TODO: move to Critic agent
def update_exploration_progress(self, info):
"""
Split task into completed_tasks or failed_tasks
Args: info = {
"task": self.task,
"success": success,
"conversations": self.conversations,
}
"""
task = info["task"]
if task.startswith("Deposit useless items into the chest at"):
return
if info["success"]:
logger.info(f"Completed task {task}.")
self.game_memory.completed_tasks.append(task)
else:
logger.info(f"Failed to complete task {task}. Skipping to next task.")
self.game_memory.failed_tasks.append(task)
self.save_sorted_tasks()
# TODO: move to Critic agent
def save_sorted_tasks(self):
updated_completed_tasks = []
# record repeated failed tasks
updated_failed_tasks = self.game_memory.failed_tasks
# dedup but keep order
for task in self.game_memory.completed_tasks:
if task not in updated_completed_tasks:
updated_completed_tasks.append(task)
# remove completed tasks from failed tasks
for task in updated_completed_tasks:
while task in updated_failed_tasks:
updated_failed_tasks.remove(task)
self.game_memory.completed_tasks = updated_completed_tasks
self.failed_tasks = updated_failed_tasks
# dump to json
with open(f"{CKPT_DIR}/curriculum/completed_tasks.json", "w") as f:
json.dump(self.game_memory.completed_tasks, f)
with open(f"{CKPT_DIR}/curriculum/failed_tasks.json", "w") as f:
json.dump(self.game_memory.failed_tasks, f)
async def _act(self) -> Message:
todo = self._rc.todo
logger.debug(f"Todo is {todo}")
# 获取最新的游戏周边环境信息
event = await self._obtain_events()
task = self.game_memory.current_task
context = self.game_memory.context
msg = self._rc.memory.get(k=1)[0]
query = msg.content
message = self.encapsule_message(query, task, event)
events = await self._obtain_events()
self.perform_game_info_callback(events, self.game_memory.update_event)
chest_observation = self.game_memory.chest_observation
DesignCurriculum.set_qa_cache(self.game_memory.qa_cache)
# msg = self._rc.memory.get(k=1)[0]
# query = msg.content
design_task_message = self.encapsule_design_task_message(
events, chest_observation
)
design_curriculum_message = self.encapsule_design_curriculum_message(
events, chest_observation
)
handler_map = {
DesignTask: self.handle_task_design,
DesignCurriculum: self.handle_curriculum_design,
}
handler = handler_map.get(type(todo))
if handler:
msg = await handler(**message)
if type(todo) == "DesignTask":
msg = await handler(**design_task_message)
else:
msg = await handler(**design_curriculum_message)
msg.cause_by = type(todo)
self._publish_message(msg)
return msg
raise ValueError(f"Unknown todo type: {type(todo)}")

View file

@ -0,0 +1,67 @@
import asyncio
from metagpt.minecraft_team import GameEnvironment
from metagpt.roles.minecraft.curriculum_agent import CurriculumDesigner
from metagpt.logs import logger
async def main():
events = [
[
"observe",
{
"voxels": ["grass_block", "dirt", "grass"],
"status": {
"health": 20,
"food": 20,
"saturation": 5,
"oxygen": 20,
"position": {"x": 0.5, "y": 84, "z": -207.5},
"velocity": {"x": 0, "y": -0.0784000015258789, "z": 0},
"yaw": 3.141592653589793,
"pitch": 0,
"onGround": True,
"equipment": [None, None, None, None, None, None],
"name": "bot",
"isInWater": False,
"isInLava": False,
"isCollidedHorizontally": False,
"isCollidedVertically": True,
"biome": "plains",
"entities": {
"chicken": 29.071822119730644,
"sheep": 20.361212992763768,
},
"timeOfDay": "day",
"inventoryUsed": 0,
"elapsedTime": 41,
},
"inventory": {},
"nearbyChests": {"(1344, 64, 1381)": "Unknown"},
"blockRecords": ["grass_block", "dirt", "grass"],
},
]
]
cd = CurriculumDesigner()
ge = GameEnvironment()
ge.update_event(events)
cd.set_memory(shared_memory=ge)
task_msg = cd.encapsule_design_task_message(
events=ge.event, chest_observation=ge.chest_observation
)
logger.info(f"Encapsuled_design_task_message: {task_msg}")
task = await cd.handle_task_design(**task_msg)
logger.info(f"Design_task_updating: {task}")
context_msg = cd.encapsule_design_curriculum_message(
events=ge.event, chest_observation=ge.chest_observation
)
logger.info(f"Encapsuled_design_task_message: {context_msg}")
context = await cd.handle_curriculum_design(**task_msg)
logger.info(f"Design_context_updating: {context}")
if __name__ == "__main__":
asyncio.run(main())