diff --git a/metagpt/actions/minecraft/design_curriculumn.py b/metagpt/actions/minecraft/design_curriculumn.py index 1079ad4dd..033ec2c81 100644 --- a/metagpt/actions/minecraft/design_curriculumn.py +++ b/metagpt/actions/minecraft/design_curriculumn.py @@ -2,8 +2,20 @@ # @Date : 2023/9/23 14:56 # @Author : stellahong (stellahong@fuzhi.ai) # @Desc : +import json +import re + +from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.vectorstores import Chroma +from metagpt.document_store import FaissStore + from metagpt.logs import logger from metagpt.actions import Action +from metagpt.utils.minecraft import load_prompt, fix_and_parse_json +from metagpt.schema import HumanMessage, SystemMessage +from metagpt.const import CKPT_DIR + +# from metagpt.actions.minecraft import PlayerActions class DesignTask(Action): @@ -11,39 +23,64 @@ class DesignTask(Action): Action class for decomposing a task. Refer to the code in the voyager/agents/curriculum.py for implementation details. """ - + def __init__(self, name="", context=None, llm=None): super().__init__(name, context, llm) - - def decompose_task(self, query): - # Implement the logic to decompose a task here. - return "" - - async def propose_next_ai_task(self, prompts, system_msg): + + async def decompose_task(self, query, events): + system_msgs = SystemMessage( + content=load_prompt("curriculum_task_decomposition") + ) + prompt = self.render_human_message( + events=events, chest_observation="" + ) + HumanMessage(content=f"Final task: {query}") + logger.info(f"Curriculum Agent task decomposition\nFinal task: {query}") + + rsp = await self._aask(prompt=prompt, system_msgs=system_msgs) + logger.info(f"Curriculum Agent task decomposition\n{rsp}") + return fix_and_parse_json(rsp) + + def parse_llm_response(self, llm_resp): + task = "" + for line in llm_resp.split("\n"): + if line.startswith("Task:"): + task = line[5:].replace(".", "").strip() + assert task, "Task not found in Curriculum Agent response" + return {"next_task": task} + + async def generate_task(self, human_msg, system_msg, max_retries=5): """ Refer to the code in the voyager/agents/curriculum.py propose_next_ai_task() for implementation details. - Returns: + Returns: task & context """ - curriculum = await self._aask(prompt=prompts, system_msgs=system_msg) - - logger.info(f"\033[31m****Curriculum Agent ai message****\n{curriculum}\033[0m") - - def parse_llm_response(self, llm_resp): - # Implement the logic to parse the LLM response here. - return "", "" - + + if max_retries == 0: + raise RuntimeError("Max retries reached, failed to propose task.") + curriculum = await self._aask(prompt=human_msg, system_msgs=system_msg) + logger.info(f"Curriculum Agent message\n{curriculum}") + try: + response = self.parse_llm_response( + curriculum + ) # Task: Craft 4 wooden planks. + assert "next_task" in response + return response["next_task"] + except Exception as e: + logger.info(f"Error parsing curriculum response: {e}. Trying again!") + return self.generate_task( + human_msg=human_msg, + system_msg=system_msg, + max_retries=max_retries - 1, + ) + async def run(self, human_msg, system_msg, *args, **kwargs): logger.info(f"run {self.__repr__()}") - + # Call the language model to generate a response. - - llm_response = await self.propose_next_ai_task(prompts=human_msg, system_msg=system_msg) - - # Parse the response from the language model. - task, context = self.parse_llm_response(llm_response) - - return task, context + + task = await self.generate_task(human_msg=human_msg, system_msg=system_msg) + + return task class DesignCurriculum(Action): @@ -51,34 +88,160 @@ class DesignCurriculum(Action): Action class for designing curriculum-related questions. Refer to the code in the voyager/agents/curriculum.py for implementation details. """ - + def __init__(self, name="", context=None, llm=None): super().__init__(name, context, llm) - self.vect_db = "" - - def get_task_context(self): - # Implement the logic for a specific task in generating context. - return "" - - def generate_qa(self): - # Implement the logic to generate curriculum-related questions and answers. - question = "" - answer = "" + # voyager vectordb using + self.qa_cache = {} + self.qa_cache_questions_vectordb = Chroma( + collection_name="qa_cache_questions_vectordb", + embedding_function=OpenAIEmbeddings(), + persist_directory=f"{CKPT_DIR}/curriculum/vectordb", + ) + # TODO: change to FaissStore + # self.qa_cache_questions_vectordb = FaissStore( {CKPT_DIR}/ 'curriculum/vectordb') + + # Check if qa_cache right using + assert self.qa_cache_questions_vectordb._collection.count() == len( + self.qa_cache + ), ( + f"Curriculum Agent's qa cache question vectordb is not synced with qa_cache.json.\n" + f"There are {self.qa_cache_questions_vectordb._collection.count()} questions in vectordb " + f"but {len(self.qa_cache)} questions in qa_cache.json.\n" + f"Did you set resume=False when initializing the agent?\n" + f"You may need to manually delete the qa cache question vectordb directory for running from scratch.\n" + ) + + @classmethod + def set_qa_cache(cls, qa_cache): + cls.qa_cache = qa_cache + + @classmethod + def generate_qa(cls, events, chest_observation): + """ + Generate qa for DesignTask's HumanMessage + """ + questions_new, _ = cls.generate_qa_step1( + events=events, chest_observation=chest_observation + ) + questions = [] + answers = [] + for question in questions_new: + if cls.qa_cache_questions_vectordb._collection.count() > 0: + docs_and_scores = ( + cls.qa_cache_questions_vectordb.similarity_search_with_score( + question, k=1 + ) + ) + if docs_and_scores and docs_and_scores[0][1] < 0.05: + question_cached = docs_and_scores[0][0].page_content + assert question_cached in cls.qa_cache + answer_cached = cls.qa_cache[question_cached] + questions.append(question_cached) + answers.append(answer_cached) + continue + answer = cls.generate_qa_step2(question=question) + assert question not in cls.qa_cache + cls.qa_cache[question] = answer + cls.qa_cache_questions_vectordb.add_texts( + texts=[question], + ) + with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "w") as f: + json.dump(cls.qa_cache, f) + cls.qa_cache_questions_vectordb.persist() + questions.append(question) + answers.append(answer) + assert len(questions_new) == len(questions) == len(answers) + return questions, answers + + async def generate_qa_step1(self, events, human_msg, system_msg): + biome = events[-1][1]["status"]["biome"].replace("_", " ") + questions = [ + f"What are the blocks that I can find in the {biome} in Minecraft?", + f"What are the items that I can find in the {biome} in Minecraft?", + f"What are the mobs that I can find in the {biome} in Minecraft?", + ] + qa_response = await self._aask(prompt=human_msg, system_msgs=system_msg) + + try: + # Regex pattern to extract question and concept pairs + pattern = r"Question \d+: (.+)\nConcept \d+: (.+)" + # Extracting all question and concept pairs from the text + pairs = re.findall(pattern, qa_response) + # Storing each question and concept in separate lists + questions_new = [pair[0] for pair in pairs] + questions.extend(questions_new) + except Exception as e: + logger.error( + f"Error parsing curriculum response for " + f"QA step 1 ask questions: {e}." + ) + return questions + + async def generate_qa_step2(self, question): + # Implement the logic for another specific step in generating questions and answers. + logger.info(f"Curriculum Agent Question: {question}") + human_msg = HumanMessage(content=f"Question: {question}").content + system_msg = [ + SystemMessage( + content=load_prompt("curriculum_qa_step2_answer_questions") + ).content + ] + answer = await self._aask(prompt=human_msg, system_msgs=system_msg) + logger.info(f"Curriculum Agent {answer}") + return answer + + async def get_context_from_task(self, task): + """ + Args: task + Returns: context: "Question: {question}\n{answer}" + if include ore in question, gpt will try to use tool with skill touch enhancement to mine + """ + + question = ( + f"How to {task.replace('_', ' ').replace(' ore', '').replace(' ores', '').replace('.', '').strip().lower()}" + f" in Minecraft?" + ) + if question in self.qa_cache: + answer = self.qa_cache[question] + else: + answer = await self.generate_qa_step2(question=question) + self.qa_cache[question] = answer + self.qa_cache_questions_vectordb.add_texts( + texts=[question], + ) + with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "w") as f: + json.dump(self.qa_cache, f) + self.qa_cache_questions_vectordb.persist() context = f"Question: {question}\n{answer}" return context - - def generate_qa_step1(self): - # Implement the logic for a specific step in generating questions and answers. - return "" - - def generate_qa_step2(self): - # Implement the logic for another specific step in generating questions and answers. - return "" - - async def run(self, *args, **kwargs): + + async def generate_context(self, task, max_retries=5): + """ + Refer to the code in the voyager/agents/curriculum.py propose_next_ai_task() for implementation details. + Returns: context + + """ + + if max_retries == 0: + raise RuntimeError("Max retries reached, failed to propose context.") + try: + context = await self.get_context_from_task( + task=task + ) # Curriculum Agent Question: How to craft 4 wooden planks in Minecraft? & Curriculum Agent Answer: ... + return context + except Exception as e: + logger.info(f"Error parsing curriculum response: {e}. Trying again!") + return self.generate_context( + task=task, + max_retries=max_retries - 1, + ) + + async def run(self, task, human_msg, system_msg, *args, **kwargs): logger.info(f"run {self.__repr__()}") # Generate curriculum-related questions and answers. - curriculum_qa = self.generate_qa() - + # curriculum_qustion = await self.generate_qa_step1(events, human_msg, system_msg) + curriculum_context = await self.generate_context(task) + # Return the generated questions and answers. - return curriculum_qa + return curriculum_context diff --git a/metagpt/const.py b/metagpt/const.py index b8b08628e..ba63f0b65 100644 --- a/metagpt/const.py +++ b/metagpt/const.py @@ -40,3 +40,46 @@ TUTORIAL_PATH = DATA_PATH / "tutorial_docx" SKILL_DIRECTORY = PROJECT_ROOT / "metagpt/skills" MEM_TTL = 24 * 30 * 3600 + +### MineCraft ### +CKPT_DIR = PROJECT_ROOT / "metagpt/ckpt" +LOG_DIR = PROJECT_ROOT / "logs" + +DEFAULT_WARMUP = { + "context": 15, + "biome": 10, + "time": 15, + "nearby_blocks": 0, + "other_blocks": 10, + "nearby_entities": 5, + "health": 15, + "hunger": 15, + "position": 0, + "equipment": 0, + "inventory": 0, + "optional_inventory_items": 7, + "chests": 0, + "completed_tasks": 0, + "failed_tasks": 0, + } + +CURRICULUM_OB = [ + "context", + "biome", + "time", + "nearby_blocks", + "other_blocks", + "nearby_entities", + "health", + "hunger", + "position", + "equipment", + "inventory", + "chests", + "completed_tasks", + "failed_tasks", + ] + + +CORE_INVENTORY_ITEMS = r".*_log|.*_planks|stick|crafting_table|furnace" +r"|cobblestone|dirt|coal|.*_pickaxe|.*_sword|.*_axe", # curriculum_agent: only show these items in inventory before optional_inventory_items reached in warm up \ No newline at end of file diff --git a/metagpt/minecraft_team.py b/metagpt/minecraft_team.py index 392d12092..27a64686f 100644 --- a/metagpt/minecraft_team.py +++ b/metagpt/minecraft_team.py @@ -16,6 +16,7 @@ from metagpt.actions.minecraft.player_action import PlayerActions from metagpt.roles.minecraft.minecraft_base import Minecraft from metagpt.environment import Environment from metagpt.mineflayer_environment import MineflayerEnv +from metagpt.const import CKPT_DIR class GameEnvironment(BaseModel, arbitrary_types_allowed=True): @@ -24,30 +25,57 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True): """ event: dict[str, Any] = Field(default_factory=dict) - current_task: str = Field(default="Craft 4 wooden planks") + current_task: str = Field(default="Mine 1 wood log") task_execution_time: float = Field(default=float) - context: str = Field(default="") + context: str = Field( + default="You can mine one of oak, birch, spruce, jungle, acacia, dark oak, or mangrove logs." + ) code: str = Field(default=None) programs: str = Field(default="") - critique: str = Field(default="") + critique: str = Field(default=None) skills: list[str] = Field(default_factory=list) + question: str = Field(default=None) - chest_memory: dict[str, Any] = Field(default_factory=dict) + qa_cache: dict[str, str] = Field(default_factory=dict) + completed_tasks: list[str] = Field(default_factory=list) # Critique things + failed_tasks: list[str] = Field(default_factory=list) + + chest_memory: dict[str, Any] = Field( + default_factory=dict + ) # eg: {'(1344, 64, 1381)': 'Unknown'} + chest_observation: str = Field(default="") # eg: "Chests: None\n\n" mf_instance: MineflayerEnv = Field(default_factory=MineflayerEnv) + @property + def progress(self): + # return len(self.completed_tasks) + 10 # Test only + return len(self.completed_tasks) + + @property + def warm_up(self): + return self.mf_instance.warm_up + + @property + def core_inv_items_regex(self): + return self.mf_instance.core_inv_items_regex + def set_mc_port(self, mc_port): self.mf_instance.set_mc_port(mc_port) - def set_mc_resume(self, resume: bool = False): + def set_mc_resume(self, resume: bool = False): # TODO: mv to config if resume: - logger.info( - f"Loading Action Developer from {self.mf_instance.ckpt_dir}/action" - ) - with open( - f"{self.mf_instance.ckpt_dir}/action/chest_memory.json", "r" - ) as f: + logger.info(f"Loading Action Developer from {CKPT_DIR}/action") + with open(f"{CKPT_DIR}/action/chest_memory.json", "r") as f: self.chest_memory = json.load(f) + + logger.info(f"Loading Curriculum Agent from {CKPT_DIR}/curriculum") + with open(f"{CKPT_DIR}/curriculum/completed_tasks.json", "r") as f: + self.completed_tasks = json.load(f) + with open(f"{CKPT_DIR}/curriculum/failed_tasks.json", "r") as f: + self.failed_tasks = json.load(f) + with open(f"{CKPT_DIR}/curriculum/qa_cache.json", "r") as f: + self.qa_cache = json.load(f) # TODO: add skills resume def register_roles(self, roles: Iterable[Minecraft]): @@ -57,6 +85,7 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True): def update_event(self, event: Dict): self.event = event self.update_chest_memory(event) + self.update_chest_observation() def update_task(self, task: str): self.current_task = task @@ -93,9 +122,33 @@ class GameEnvironment(BaseModel, arbitrary_types_allowed=True): if chest != "Invalid": logger.info(f"Action Developer saving chest {position}: {chest}") self.chest_memory[position] = chest - with open(f"{self.mf_instance.ckpt_dir}/action/chest_memory.json", "w") as f: + with open(f"{CKPT_DIR}/action/chest_memory.json", "w") as f: json.dump(self.chest_memory, f) + def update_chest_observation(self): + """ + update chest_memory to chest_observation. + Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/action.py + """ + + chests = [] + for chest_position, chest in self.chest_memory.items(): + if isinstance(chest, dict) and len(chest) > 0: + chests.append(f"{chest_position}: {chest}") + for chest_position, chest in self.chest_memory.items(): + if isinstance(chest, dict) and len(chest) == 0: + chests.append(f"{chest_position}: Empty") + for chest_position, chest in self.chest_memory.items(): + if isinstance(chest, str): + assert chest == "Unknown" + chests.append(f"{chest_position}: Unknown items inside") + assert len(chests) == len(self.chest_memory) + if chests: + chests = "\n".join(chests) + self.chest_observation = f"Chests:\n{chests}\n\n" + else: + self.chest_observation = f"Chests: None\n\n" + async def on_event(self, *args): """ Retrieve Minecraft events. diff --git a/metagpt/mineflayer_environment.py b/metagpt/mineflayer_environment.py index e10127fe1..575361898 100644 --- a/metagpt/mineflayer_environment.py +++ b/metagpt/mineflayer_environment.py @@ -6,11 +6,12 @@ import os import time import json import requests +import re from metagpt.logs import logger import metagpt.utils.minecraft as U from metagpt.utils.minecraft.process_monitor import SubprocessMonitor - +from metagpt.const import CKPT_DIR, DEFAULT_WARMUP, CURRICULUM_OB, CORE_INVENTORY_ITEMS class MineflayerEnv: def __init__( @@ -29,9 +30,33 @@ class MineflayerEnv: self.reset_options = None self.connected = False self.server_paused = False - self.ckpt_dir = "metagpt/ckpt" - os.makedirs(f"{self.ckpt_dir}/action", exist_ok=True) + self.warm_up = {} # turns that when to add part of curriculum_ob to HumanMessage TODO: MV + self.core_inv_items_regex = None + + self._set_warmup() + + os.makedirs(f"{CKPT_DIR}/curriculum/vectordb", exist_ok=True) + os.makedirs(f"{CKPT_DIR}/action", exist_ok=True) + + def _set_warmup(self): + warm_up = DEFAULT_WARMUP + if "optional_inventory_items" in warm_up: + assert CORE_INVENTORY_ITEMS is not None + self.core_inv_items_regex = re.compile( + CORE_INVENTORY_ITEMS + ) + self.warm_up["optional_inventory_items"] = warm_up[ + "optional_inventory_items" + ] + else: + self.warm_up["optional_inventory_items"] = 0 + for key in CURRICULUM_OB: + self.warm_up[key] = warm_up.get(key, DEFAULT_WARMUP[key]) + self.warm_up["nearby_blocks"] = 0 + self.warm_up["inventory"] = 0 + self.warm_up["completed_tasks"] = 0 + self.warm_up["failed_tasks"] = 0 def set_mc_port(self, mc_port): self.mc_port = mc_port diff --git a/metagpt/roles/minecraft/action_developer.py b/metagpt/roles/minecraft/action_developer.py index 1184b5da4..65dd5afa2 100644 --- a/metagpt/roles/minecraft/action_developer.py +++ b/metagpt/roles/minecraft/action_developer.py @@ -41,30 +41,6 @@ class ActionDeveloper(Base): # 需要根据events进行自己chest_observation的更新 self._watch([RetrieveSkills]) - def render_chest_observation(self): - """ - Render game_memory.chest_memory to prompt text. - Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/action.py - """ - - chests = [] - for chest_position, chest in self.game_memory.chest_memory.items(): - if isinstance(chest, dict) and len(chest) > 0: - chests.append(f"{chest_position}: {chest}") - for chest_position, chest in self.game_memory.chest_memory.items(): - if isinstance(chest, dict) and len(chest) == 0: - chests.append(f"{chest_position}: Empty") - for chest_position, chest in self.game_memory.chest_memory.items(): - if isinstance(chest, str): - assert chest == "Unknown" - chests.append(f"{chest_position}: Unknown items inside") - assert len(chests) == len(self.game_memory.chest_memory) - if chests: - chests = "\n".join(chests) - return f"Chests:\n{chests}\n\n" - else: - return f"Chests: None\n\n" - def render_system_message(self, skills=[], *args, **kwargs): """ According to basic skills context files to genenarate js skill codes. @@ -163,12 +139,11 @@ class ActionDeveloper(Base): observation += f"Equipment: {equipment}\n\n" observation += f"Inventory ({inventory_used}/36): {'Empty' if not inventory else ', '.join(inventory)}\n\n" - # TODO: if task update, uncomment this - # if not ( - # task == "Place and deposit useless items into a chest" - # or task.startswith("Deposit useless items into the chest at") - # ): - observation += self.render_chest_observation() + if not ( + task == "Place and deposit useless items into a chest" + or task.startswith("Deposit useless items into the chest at") + ): + observation += self.game_memory.chest_observation observation += f"Task: {task}\n\n" observation += f"Context: {context or 'None'}\n\n" diff --git a/metagpt/roles/minecraft/curriculum_agent.py b/metagpt/roles/minecraft/curriculum_agent.py index 2a00cc45a..9004f06fa 100644 --- a/metagpt/roles/minecraft/curriculum_agent.py +++ b/metagpt/roles/minecraft/curriculum_agent.py @@ -2,38 +2,249 @@ # @Date : 2023/9/23 12:45 # @Author : stellahong (stellahong@fuzhi.ai) # @Desc : +import random +import json + from metagpt.logs import logger from metagpt.schema import Message, HumanMessage, SystemMessage from metagpt.roles.minecraft.minecraft_base import Minecraft as Base from metagpt.actions.minecraft.design_curriculumn import DesignCurriculum, DesignTask from metagpt.actions.minecraft.player_action import PlayerActions +from metagpt.utils.minecraft import load_prompt +from metagpt.const import CKPT_DIR, CURRICULUM_OB class CurriculumDesigner(Base): """ CurriculumDesigner is the automatic curriculum in paper, refer to the code voyager/agents/curriculum.py """ - + def __init__( - self, - name: str = "David", - profile: str = "Expertise in minecraft task design and curriculum development.", - goal: str = " Collect and integrate learner feedback to improve and refine educational content and pathways", - constraints: str = "Limited budget and resources for the development of educational content and technology tools." + self, + name: str = "David", + profile: str = "Expertise in minecraft task design and curriculum development.", + goal: str = " Collect and integrate learner feedback to improve and refine educational content and pathways", + constraints: str = "Limited budget and resources for the development of educational content and technology tools.", ) -> None: super().__init__(name, profile, goal, constraints) # Initialize actions specific to the Action role self._init_actions([DesignTask, DesignCurriculum]) - + # Set events or actions the ActionAgent should watch or be aware of self._watch([PlayerActions, DesignTask]) - - def render_human_message(self, msg, *args, **kwargs): - return HumanMessage(content=msg) - - def render_system_message(self, msg, *args, **kwargs): - return SystemMessage(content=msg) - + + def render_curriculum_observation(self, *, events, chest_observation): + """ + Returns: observation for curriculum + Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/curriculum.py + """ + + assert events[-1][0] == "observe", "Last event must be observe" + event = events[-1][1] + biome = event["status"]["biome"] + time_of_day = event["status"]["timeOfDay"] + voxels = event["voxels"] + block_records = event["blockRecords"] + entities = event["status"]["entities"] + health = event["status"]["health"] + hunger = event["status"]["food"] + position = event["status"]["position"] + equipment = event["status"]["equipment"] + inventory_used = event["status"]["inventoryUsed"] + inventory = event["inventory"] + + if not any( + "dirt" in block + or "log" in block + or "grass" in block + or "sand" in block + or "snow" in block + for block in voxels + ): + biome = "underground" + + other_blocks = ", ".join( + list( + set(block_records).difference(set(voxels).union(set(inventory.keys()))) + ) + ) + + other_blocks = other_blocks if other_blocks else "None" + + nearby_entities = ( + ", ".join([k for k, v in sorted(entities.items(), key=lambda x: x[1])]) + if entities + else "None" + ) + + completed_tasks = ( + ", ".join(self.game_memory.completed_tasks) + if self.game_memory.completed_tasks + else "None" + ) + failed_tasks = ( + ", ".join(self.game_memory.failed_tasks) + if self.game_memory.failed_tasks + else "None" + ) + + # filter out optional inventory items if required + if ( + self.game_memory.progress + < self.game_memory.warm_up["optional_inventory_items"] + ): + inventory = { + k: v + for k, v in inventory.items() + if self.game_memory.core_inv_items_regex.search(k) is not None + } + + observation = { + "context": "", + "biome": f"Biome: {biome}\n\n", + "time": f"Time: {time_of_day}\n\n", + "nearby_blocks": f"Nearby blocks: {', '.join(voxels) if voxels else 'None'}\n\n", + "other_blocks": f"Other blocks that are recently seen: {other_blocks}\n\n", + "nearby_entities": f"Nearby entities: {nearby_entities}\n\n", + "health": f"Health: {health:.1f}/20\n\n", + "hunger": f"Hunger: {hunger:.1f}/20\n\n", + "position": f"Position: x={position['x']:.1f}, y={position['y']:.1f}, z={position['z']:.1f}\n\n", + "equipment": f"Equipment: {equipment}\n\n", + "inventory": f"Inventory ({inventory_used}/36): {inventory if inventory else 'Empty'}\n\n", + "chests": chest_observation, + "completed_tasks": f"Completed tasks so far: {completed_tasks}\n\n", + "failed_tasks": f"Failed tasks that are too hard: {failed_tasks}\n\n", + } + return observation + + # --------------------------------Design Task Prepare--------------------------------------- + def render_design_task_human_message( + self, events, chest_observation, *args, **kwargs + ): + """ + Returns: observation for curriculum + Refer to @ https://github.com/MineDojo/Voyager/blob/main/voyager/agents/curriculum.py + """ + + content = "" + warm_up = self.game_memory.mf_instance.warm_up + observation = self.render_curriculum_observation( + events=events, chest_observation=chest_observation + ) + if self.game_memory.progress >= warm_up["context"]: + questions, answers = DesignCurriculum.generate_qa( + events=events, chest_observation=chest_observation + ) + i = 1 + for question, answer in zip(questions, answers): + if "Answer: Unknown" in answer or "language model" in answer: + continue + observation["context"] += f"Question {i}: {question}\n" + observation["context"] += f"{answer}\n\n" + i += 1 + if i > 5: + break + + for key in CURRICULUM_OB: + if self.game_memory.progress >= warm_up[key]: + if warm_up[key] != 0: + should_include = random.random() < 0.8 + else: + should_include = True + if should_include: + content += observation[key] + + logger.info(f"Curriculum Agent human message\n{content}") + return HumanMessage(content=content) + + def render_design_task_system_message(self, *args, **kwargs): + return SystemMessage(content=load_prompt("curriculum")) + + def encapsule_design_task_message(self, events, chest_observation, *args, **kwargs): + human_msg = self.render_design_task_human_message( + events=events, chest_observation=chest_observation, *args, **kwargs + ) + system_msg = self.render_design_task_system_message(*args, **kwargs) + return {"system_msg": [system_msg.content], "human_msg": human_msg.content} + + def generate_task_if_inventory_full(self, events, chest_observation): + """ + TODO: Try if this could be done with prompt + Returns: Task When inventory is almost full + """ + if chest_observation != "Chests: None\n\n": + chests = chest_observation[8:-2].split("\n") + for chest in chests: + content = chest.split(":")[1] + if content == " Unknown items inside" or content == " Empty": + position = chest.split(":")[0] + task = f"Deposit useless items into the chest at {position}" + return task + if "chest" in events[-1][1]["inventory"]: + task = "Place a chest" + else: + task = "Craft 1 chest" + return task + + # ----------------------------------------------------------------------------------------- + + # --------------------------------Design Curriculum Prepare-------------------------------- + def render_design_curriculum_system_message(self, *args, **kwargs): + return SystemMessage(content=load_prompt("curriculum_qa_step1_ask_questions")) + + def render_design_curriculum_human_message( + self, events, chest_observation, *args, **kwargs + ): + observation = self.render_curriculum_observation( + events=events, chest_observation=chest_observation + ) + content = "" + for key in CURRICULUM_OB: + content += observation[key] + return HumanMessage(content=content) + + def encapsule_design_curriculum_message( + self, events, chest_observation, *args, **kwargs + ): + human_msg = self.render_design_curriculum_human_message( + events=events, chest_observation=chest_observation, *args, **kwargs + ) + system_msg = self.render_design_curriculum_system_message(*args, **kwargs) + return {"system_msg": [system_msg.content], "human_msg": human_msg.content} + + def generate_context_if_inventory_full(self, events, chest_observation): + """ + TODO: Try if this could be done with prompt + Returns: Context When inventory is almost full + """ + inventoryUsed = events[-1][1]["status"]["inventoryUsed"] + if chest_observation != "Chests: None\n\n": + chests = chest_observation[8:-2].split("\n") + for chest in chests: + content = chest.split(":")[1] + if content == " Unknown items inside" or content == " Empty": + context = ( + f"Your inventory have {inventoryUsed} occupied slots before depositing. " + "After depositing, your inventory should only have 20 occupied slots. " + "You should deposit useless items such as andesite, dirt, cobblestone, etc. " + "Also, you can deposit low-level tools, " + "For example, if you have a stone pickaxe, you can deposit a wooden pickaxe. " + "Make sure the list of useless items are in your inventory " + "(do not list items already in the chest), " + "You can use bot.inventoryUsed() to check how many inventory slots are used." + ) + return context + if "chest" in events[-1][1]["inventory"]: + context = ( + f"You have a chest in inventory, place it around you. " + f"If chests is not None, or nearby blocks contains chest, this task is success." + ) + else: + context = "Craft 1 chest with 8 planks of any kind of wood." + return context + + # ----------------------------------------------------------------------------------------- + async def handle_task_design(self, human_msg, system_msg, *args, **kwargs): """ Args: @@ -44,10 +255,25 @@ class CurriculumDesigner(Base): Returns: """ - task = await DesignTask().run(human_msg, system_msg, *args, **kwargs) + events = self.game_memory.event + chest_observation = self.game_memory.chest_observation + inventoryUsed = events[-1][1]["status"]["inventoryUsed"] + + if self.game_memory.progress == 0: + task = self.game_memory.current_task + elif inventoryUsed >= 33: + task = self.generate_task_if_inventory_full( + self, events=events, chest_observation=chest_observation + ) + else: + task = await DesignTask().run(human_msg, system_msg, *args, **kwargs) + logger.info(f"Handle_task_design result is Here: {task}") + self.perform_game_info_callback(task, self.game_memory.update_task) - return Message(content=f"{task}", instruct_content="task_design", role=self.profile) - + return Message( + content=f"{task}", instruct_content="task_design", role=self.profile + ) + async def handle_curriculum_design(self, human_msg, system_msg, *args, **kwargs): """ refer to the context generation in voyager @@ -60,34 +286,107 @@ class CurriculumDesigner(Base): Returns: """ - context = await DesignCurriculum().run(human_msg, system_msg, *args, **kwargs) + events = self.game_memory.event + chest_observation = self.game_memory.chest_observation + inventoryUsed = events[-1][1]["status"]["inventoryUsed"] + task = self.game_memory.current_task + + if self.game_memory.progress == 0: + context = self.game_memory.context + elif inventoryUsed >= 33: + context = self.generate_context_if_inventory_full( + self, events=events, chest_observation=chest_observation + ) + else: + context = await DesignCurriculum().run( + task, human_msg, system_msg, *args, **kwargs + ) self.perform_game_info_callback(context, self.game_memory.update_context) - return Message(content=f"{context}", instruct_content="curriculum_design", role=self.profile) - + return Message( + content=f"{context}", + instruct_content="curriculum_design", + role=self.profile, + ) + + # TODO: move to Critic agent + def update_exploration_progress(self, info): + """ + Split task into completed_tasks or failed_tasks + Args: info = { + "task": self.task, + "success": success, + "conversations": self.conversations, + } + """ + task = info["task"] + if task.startswith("Deposit useless items into the chest at"): + return + if info["success"]: + logger.info(f"Completed task {task}.") + self.game_memory.completed_tasks.append(task) + else: + logger.info(f"Failed to complete task {task}. Skipping to next task.") + self.game_memory.failed_tasks.append(task) + + self.save_sorted_tasks() + + # TODO: move to Critic agent + def save_sorted_tasks(self): + updated_completed_tasks = [] + # record repeated failed tasks + updated_failed_tasks = self.game_memory.failed_tasks + # dedup but keep order + for task in self.game_memory.completed_tasks: + if task not in updated_completed_tasks: + updated_completed_tasks.append(task) + + # remove completed tasks from failed tasks + for task in updated_completed_tasks: + while task in updated_failed_tasks: + updated_failed_tasks.remove(task) + + self.game_memory.completed_tasks = updated_completed_tasks + self.failed_tasks = updated_failed_tasks + + # dump to json + with open(f"{CKPT_DIR}/curriculum/completed_tasks.json", "w") as f: + json.dump(self.game_memory.completed_tasks, f) + with open(f"{CKPT_DIR}/curriculum/failed_tasks.json", "w") as f: + json.dump(self.game_memory.failed_tasks, f) + async def _act(self) -> Message: todo = self._rc.todo logger.debug(f"Todo is {todo}") - + # 获取最新的游戏周边环境信息 - event = await self._obtain_events() - task = self.game_memory.current_task - context = self.game_memory.context - - msg = self._rc.memory.get(k=1)[0] - query = msg.content - - message = self.encapsule_message(query, task, event) - + events = await self._obtain_events() + self.perform_game_info_callback(events, self.game_memory.update_event) + chest_observation = self.game_memory.chest_observation + + DesignCurriculum.set_qa_cache(self.game_memory.qa_cache) + + # msg = self._rc.memory.get(k=1)[0] + # query = msg.content + + design_task_message = self.encapsule_design_task_message( + events, chest_observation + ) + design_curriculum_message = self.encapsule_design_curriculum_message( + events, chest_observation + ) + handler_map = { - DesignTask: self.handle_task_design, DesignCurriculum: self.handle_curriculum_design, } handler = handler_map.get(type(todo)) if handler: - msg = await handler(**message) + if type(todo) == "DesignTask": + msg = await handler(**design_task_message) + else: + msg = await handler(**design_curriculum_message) msg.cause_by = type(todo) self._publish_message(msg) return msg - + raise ValueError(f"Unknown todo type: {type(todo)}") diff --git a/tests/metagpt/roles/minecraft/test_curriculum_agent.py b/tests/metagpt/roles/minecraft/test_curriculum_agent.py new file mode 100644 index 000000000..28196abe5 --- /dev/null +++ b/tests/metagpt/roles/minecraft/test_curriculum_agent.py @@ -0,0 +1,67 @@ +import asyncio + +from metagpt.minecraft_team import GameEnvironment +from metagpt.roles.minecraft.curriculum_agent import CurriculumDesigner +from metagpt.logs import logger + + +async def main(): + events = [ + [ + "observe", + { + "voxels": ["grass_block", "dirt", "grass"], + "status": { + "health": 20, + "food": 20, + "saturation": 5, + "oxygen": 20, + "position": {"x": 0.5, "y": 84, "z": -207.5}, + "velocity": {"x": 0, "y": -0.0784000015258789, "z": 0}, + "yaw": 3.141592653589793, + "pitch": 0, + "onGround": True, + "equipment": [None, None, None, None, None, None], + "name": "bot", + "isInWater": False, + "isInLava": False, + "isCollidedHorizontally": False, + "isCollidedVertically": True, + "biome": "plains", + "entities": { + "chicken": 29.071822119730644, + "sheep": 20.361212992763768, + }, + "timeOfDay": "day", + "inventoryUsed": 0, + "elapsedTime": 41, + }, + "inventory": {}, + "nearbyChests": {"(1344, 64, 1381)": "Unknown"}, + "blockRecords": ["grass_block", "dirt", "grass"], + }, + ] + ] + + cd = CurriculumDesigner() + ge = GameEnvironment() + ge.update_event(events) + cd.set_memory(shared_memory=ge) + + task_msg = cd.encapsule_design_task_message( + events=ge.event, chest_observation=ge.chest_observation + ) + logger.info(f"Encapsuled_design_task_message: {task_msg}") + task = await cd.handle_task_design(**task_msg) + logger.info(f"Design_task_updating: {task}") + + context_msg = cd.encapsule_design_curriculum_message( + events=ge.event, chest_observation=ge.chest_observation + ) + logger.info(f"Encapsuled_design_task_message: {context_msg}") + context = await cd.handle_curriculum_design(**task_msg) + logger.info(f"Design_context_updating: {context}") + + +if __name__ == "__main__": + asyncio.run(main())