From e9603c6f03078905c0df5a6a5458b2fc8737176f Mon Sep 17 00:00:00 2001 From: luxiangtao Date: Mon, 15 Apr 2024 17:46:14 +0800 Subject: [PATCH 1/4] add experience --- metagpt/actions/di/use_experience.py | 219 ++++++++++++++++++++++ metagpt/actions/di/write_analysis_code.py | 2 + metagpt/prompts/di/get_task_summary.py | 10 + metagpt/prompts/di/write_analysis_code.py | 4 + metagpt/roles/di/data_interpreter.py | 11 +- metagpt/roles/role.py | 10 +- 6 files changed, 250 insertions(+), 6 deletions(-) create mode 100644 metagpt/actions/di/use_experience.py create mode 100644 metagpt/prompts/di/get_task_summary.py diff --git a/metagpt/actions/di/use_experience.py b/metagpt/actions/di/use_experience.py new file mode 100644 index 000000000..429d898b2 --- /dev/null +++ b/metagpt/actions/di/use_experience.py @@ -0,0 +1,219 @@ +import json +import chromadb + +from pydantic import BaseModel +from metagpt.actions import Action +from metagpt.const import SERDESER_PATH +from metagpt.logs import logger +from metagpt.prompts.di.get_task_summary import TASK_CODE_DESCRIPTION_PROMPT +from metagpt.schema import Task +from metagpt.strategy.planner import Planner +from metagpt.rag.engines import SimpleEngine +from metagpt.rag.schema import ChromaRetrieverConfig +from examples.rag_pipeline import TRAVEL_DOC_PATH + + +class Trajectory(BaseModel): + user_requirement: str = "" + task_map: dict[str, Task] = {} + task: Task = None + is_used: bool = False + + def rag_key(self) -> str: + """For search""" + return self.task.instruction + +class Experience(BaseModel): + code_summary: str = "" + trajectory: Trajectory = None + + def rag_key(self) -> str: + """For search""" + return self.code_summary + +EXPERIENCE_COLLECTION_NAME = "di_experience_0" +TRAJECTORY_COLLECTION_NAME = "di_trajectory_0" +PERSIST_PATH = SERDESER_PATH / "data_interpreter/chroma" + + +class AddNewExperiences(Action): + name: str = "AddNewTaskExperiences" + + def _init_engine(self, collection_name: str): + """Initialize a collection for storing code experiences + """ + + engine = SimpleEngine.from_docs( + input_files=[TRAVEL_DOC_PATH], + retriever_configs=[ChromaRetrieverConfig( + persist_path=PERSIST_PATH, + collection_name=collection_name)], + ) + + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + chroma_collection = db.get_or_create_collection(collection_name) + chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) + + return engine + + async def _single_task_summary(self,trajectory_collection_name: str,experience_collection_name: str): + trajectory_engine = self._init_engine(collection_name=trajectory_collection_name) + experience_engine = self._init_engine(collection_name=experience_collection_name) + + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + collection = db.get_or_create_collection(trajectory_collection_name) + + unused_ids=[id for id in collection.get()["ids"] if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"]==False] + trajectory_dicts = [json.loads(metadata["obj_json"]) for metadata in collection.get(unused_ids)["metadatas"]] + trajectories = [] + experiences = [] + for trajectory_dict in trajectory_dicts: + + trajectory_dict["is_used"] = True + trajectory = Trajectory(**trajectory_dict) + trajectories.append(trajectory) + + code_summary = await self.task_code_sumarization(trajectory) + experience = Experience(code_summary=code_summary, trajectory=trajectory) + experiences.append(experience) + + collection.delete(unused_ids) + trajectory_engine.add_objs(trajectories) + experience_engine.add_objs(experiences) + + + async def task_code_sumarization(self, trajectory: Trajectory): + """Summarize the task code + Args: + task: The task to be summarized. + Returns: + A summary of the task code. + """ + task = trajectory.task + prompt = TASK_CODE_DESCRIPTION_PROMPT.format(code_snippet=task.code, code_result=task.result, + code_success="Success" if task.is_success else "Failure") + resp = await self._aask(prompt=prompt) + return resp + + async def run(self, + trajectory_collection_name: str=TRAJECTORY_COLLECTION_NAME, + experience_collection_name: str=EXPERIENCE_COLLECTION_NAME, + mode :str = "single_task_summary" + ): + """Initiate a collection and Add a new task experience to the collection + + Args: + trajectory_collection_name(str): the trajectory collection_name to be used for geting experiences. + experience_collection_name(str): the experience collection_name to be used for saving experiences. + mode(str): how to generate experiences + + """ + if mode == "single_task_summary": + await self._single_task_summary(trajectory_collection_name=trajectory_collection_name,experience_collection_name=experience_collection_name) + else: + pass # TODO:add other methods to generate experiences from trajectories + +class AddNewTrajectories(Action): + name: str = "AddNewTrajectories" + + def _init_engine(self,collection_name: str): + """Initialize a collection for storing code experiences + """ + + engine = SimpleEngine.from_docs( + input_files=[TRAVEL_DOC_PATH], + retriever_configs=[ChromaRetrieverConfig( + persist_path=PERSIST_PATH, + collection_name=collection_name)], + ) + + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + chroma_collection = db.get_or_create_collection(collection_name) + chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) + + return engine + + async def run(self, planner: Planner, trajectory_collection_name: str=TRAJECTORY_COLLECTION_NAME): + """ + Initiate a collection and add new trajectories to the collection + """ + engine = self._init_engine(trajectory_collection_name) + + if not planner.plan.tasks: + return + + user_requirement = planner.plan.goal + task_map = planner.plan.task_map + trajectories = [Trajectory(user_requirement=user_requirement, task_map=task_map, task=task, is_used=False) for task in planner.plan.tasks] + + engine.add_objs(trajectories) + + + +class RetrieveExperiences(Action): + name: str = "RetrieveExperiences" + + def _init_engine(self,collection_name: str,top_k: int): + """Initialize a SimpleEngine for retrieving experiences + + Args: + query (str): The chromadb collectin_name + top_k (int): The number of eperiences to be retrieved. + """ + + engine = SimpleEngine.from_docs( + input_files=[TRAVEL_DOC_PATH], + retriever_configs=[ChromaRetrieverConfig( + persist_path=PERSIST_PATH, + collection_name=collection_name, + similarity_top_k=top_k)], + ) + + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + chroma_collection = db.get_or_create_collection(collection_name) + chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) + + return engine + + async def run(self, query: str,experience_collection_name: str=EXPERIENCE_COLLECTION_NAME, top_k: int = 5) -> str: + """Retrieve past attempted tasks + + Args: + query (str): The task instruction to be used for retrieval. + experience_collection_name(str): the collextion_name for retrieving experiences + top_k (int, optional): The number of experiences to be retrieved. Defaults to 5. + + Returns: + _type_: _description_ + """ + engine = self._init_engine(collection_name=experience_collection_name,top_k=top_k) + + if len(query) <= 2: # not "" or not '""' + return "" + + nodes = await engine.aretrieve(query) + new_experiences = [] + for i, node in enumerate(nodes): + try: + code_summary = node.node.metadata["obj"].code_summary + trajectory = node.node.metadata["obj"].trajectory + + except: + continue + + # Create the experience dictionary with placeholder keys + experience = { + "Reference __i__": trajectory.task.instruction, + "Task code": trajectory.task.code, + "Code summary": code_summary, + "Task result": trajectory.task.result, + "Task outcome": "Success" if trajectory.task.is_success else "Failure", + "Task ownership's requirement": "This task is part of " + trajectory.user_requirement + } + + # Replace the placeholder in the keys + experience = {k.replace("__i__", str(i)): v for k, v in experience.items()} + new_experiences.append(experience) + + logger.info("retrieval done") + return json.dumps(new_experiences, indent=4) diff --git a/metagpt/actions/di/write_analysis_code.py b/metagpt/actions/di/write_analysis_code.py index 711e56d39..f6d477059 100644 --- a/metagpt/actions/di/write_analysis_code.py +++ b/metagpt/actions/di/write_analysis_code.py @@ -41,11 +41,13 @@ class WriteAnalysisCode(Action): tool_info: str = "", working_memory: list[Message] = None, use_reflection: bool = False, + experiences: str = "", **kwargs, ) -> str: structual_prompt = STRUCTUAL_PROMPT.format( user_requirement=user_requirement, plan_status=plan_status, + experiences=experiences, tool_info=tool_info, ) diff --git a/metagpt/prompts/di/get_task_summary.py b/metagpt/prompts/di/get_task_summary.py new file mode 100644 index 000000000..fcf586912 --- /dev/null +++ b/metagpt/prompts/di/get_task_summary.py @@ -0,0 +1,10 @@ +TASK_CODE_DESCRIPTION_PROMPT = """ +Please explain in a paragraph what the following code snippet does. Only the function of the code snippet needs to be explained, no variable names need to be explained. + +Code snippet: +{code_snippet} +Code Execution Result: +{code_result} +Code Success or Failure: +{code_success} +""" \ No newline at end of file diff --git a/metagpt/prompts/di/write_analysis_code.py b/metagpt/prompts/di/write_analysis_code.py index e5663d498..8c7d1a343 100644 --- a/metagpt/prompts/di/write_analysis_code.py +++ b/metagpt/prompts/di/write_analysis_code.py @@ -7,6 +7,10 @@ STRUCTUAL_PROMPT = """ # Plan Status {plan_status} +# Reference experience (can be empty): +This is some previous coding experience that is similar to the current task. You can learn from the successful code and avoid the mistakes from the failed code. If there are other codes you don't know about in the experience, please don't refer to it. +{experiences} + # Tool Info {tool_info} diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index 547f4b90b..61e4e2563 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -38,6 +38,7 @@ class DataInterpreter(Role): auto_run: bool = True use_plan: bool = True use_reflection: bool = False + use_experience: bool = False execute_code: ExecuteNbCode = Field(default_factory=ExecuteNbCode, exclude=True) tools: list[str] = [] # Use special symbol [""] to indicate use of all registered tools tool_recommender: ToolRecommender = None @@ -94,13 +95,13 @@ class DataInterpreter(Role): await self.execute_code.terminate() raise e - async def _act_on_task(self, current_task: Task) -> TaskResult: + async def _act_on_task(self, current_task: Task, experiences: str) -> TaskResult: """Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation.""" - code, result, is_success = await self._write_and_exec_code() + code, result, is_success = await self._write_and_exec_code(experiences=experiences) task_result = TaskResult(code=code, result=result, is_success=is_success) return task_result - async def _write_and_exec_code(self, max_retry: int = 3): + async def _write_and_exec_code(self, max_retry: int = 3, experiences: str = ""): counter = 0 success = False @@ -122,7 +123,7 @@ class DataInterpreter(Role): while not success and counter < max_retry: ### write code ### - code, cause_by = await self._write_code(counter, plan_status, tool_info) + code, cause_by = await self._write_code(counter, plan_status, tool_info, experiences = experiences if counter == 0 else "") self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) @@ -148,6 +149,7 @@ class DataInterpreter(Role): counter: int, plan_status: str = "", tool_info: str = "", + experiences: str = "" ): todo = self.rc.todo # todo is WriteAnalysisCode logger.info(f"ready to {todo.name}") @@ -161,6 +163,7 @@ class DataInterpreter(Role): tool_info=tool_info, working_memory=self.working_memory.get(), use_reflection=use_reflection, + experiences = experiences ) return code, todo diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index 57dd51139..aa8d833a5 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -30,11 +30,12 @@ from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validat from metagpt.actions import Action, ActionOutput from metagpt.actions.action_node import ActionNode from metagpt.actions.add_requirement import UserRequirement +from metagpt.actions.di.use_experience import RetrieveExperiences, AddNewTrajectories from metagpt.context_mixin import ContextMixin from metagpt.logs import logger from metagpt.memory import Memory from metagpt.provider import HumanProvider -from metagpt.schema import Message, MessageQueue, SerializationMixin +from metagpt.schema import Message, MessageQueue, SerializationMixin, TaskResult, Task from metagpt.strategy.planner import Planner from metagpt.utils.common import any_to_name, any_to_str, role_raise_decorator from metagpt.utils.project_repo import ProjectRepo @@ -490,11 +491,16 @@ class Role(SerializationMixin, ContextMixin, BaseModel): task = self.planner.current_task logger.info(f"ready to take on task {task}") + # retrieve past tasks for this task + experiences = await RetrieveExperiences().run(query=task.instruction) if self.use_experience else "" + # take on current task - task_result = await self._act_on_task(task) + task_result = await self._act_on_task(task, experiences) # process the result, such as reviewing, confirming, plan updating await self.planner.process_task_result(task_result) + + await AddNewTrajectories().run(self.planner) rsp = self.planner.get_useful_memories()[0] # return the completed plan as a response From fd6eb8882ed708c86c4abe3a4da1c4dc23a738b0 Mon Sep 17 00:00:00 2001 From: luxiangtao Date: Mon, 15 Apr 2024 17:50:11 +0800 Subject: [PATCH 2/4] pre-commit --- metagpt/actions/di/use_experience.py | 104 ++++++++++++++----------- metagpt/prompts/di/get_task_summary.py | 2 +- metagpt/roles/di/data_interpreter.py | 14 ++-- metagpt/roles/role.py | 8 +- 4 files changed, 67 insertions(+), 61 deletions(-) diff --git a/metagpt/actions/di/use_experience.py b/metagpt/actions/di/use_experience.py index 429d898b2..2b2601e0c 100644 --- a/metagpt/actions/di/use_experience.py +++ b/metagpt/actions/di/use_experience.py @@ -1,16 +1,17 @@ import json -import chromadb +import chromadb from pydantic import BaseModel + +from examples.rag_pipeline import TRAVEL_DOC_PATH from metagpt.actions import Action from metagpt.const import SERDESER_PATH from metagpt.logs import logger from metagpt.prompts.di.get_task_summary import TASK_CODE_DESCRIPTION_PROMPT -from metagpt.schema import Task -from metagpt.strategy.planner import Planner from metagpt.rag.engines import SimpleEngine from metagpt.rag.schema import ChromaRetrieverConfig -from examples.rag_pipeline import TRAVEL_DOC_PATH +from metagpt.schema import Task +from metagpt.strategy.planner import Planner class Trajectory(BaseModel): @@ -22,7 +23,8 @@ class Trajectory(BaseModel): def rag_key(self) -> str: """For search""" return self.task.instruction - + + class Experience(BaseModel): code_summary: str = "" trajectory: Trajectory = None @@ -31,6 +33,7 @@ class Experience(BaseModel): """For search""" return self.code_summary + EXPERIENCE_COLLECTION_NAME = "di_experience_0" TRAJECTORY_COLLECTION_NAME = "di_trajectory_0" PERSIST_PATH = SERDESER_PATH / "data_interpreter/chroma" @@ -40,14 +43,11 @@ class AddNewExperiences(Action): name: str = "AddNewTaskExperiences" def _init_engine(self, collection_name: str): - """Initialize a collection for storing code experiences - """ + """Initialize a collection for storing code experiences""" engine = SimpleEngine.from_docs( input_files=[TRAVEL_DOC_PATH], - retriever_configs=[ChromaRetrieverConfig( - persist_path=PERSIST_PATH, - collection_name=collection_name)], + retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], ) db = chromadb.PersistentClient(path=str(PERSIST_PATH)) @@ -55,20 +55,23 @@ class AddNewExperiences(Action): chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) return engine - - async def _single_task_summary(self,trajectory_collection_name: str,experience_collection_name: str): + + async def _single_task_summary(self, trajectory_collection_name: str, experience_collection_name: str): trajectory_engine = self._init_engine(collection_name=trajectory_collection_name) experience_engine = self._init_engine(collection_name=experience_collection_name) db = chromadb.PersistentClient(path=str(PERSIST_PATH)) collection = db.get_or_create_collection(trajectory_collection_name) - - unused_ids=[id for id in collection.get()["ids"] if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"]==False] + + unused_ids = [ + id + for id in collection.get()["ids"] + if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"] == False + ] trajectory_dicts = [json.loads(metadata["obj_json"]) for metadata in collection.get(unused_ids)["metadatas"]] trajectories = [] experiences = [] for trajectory_dict in trajectory_dicts: - trajectory_dict["is_used"] = True trajectory = Trajectory(**trajectory_dict) trajectories.append(trajectory) @@ -76,12 +79,11 @@ class AddNewExperiences(Action): code_summary = await self.task_code_sumarization(trajectory) experience = Experience(code_summary=code_summary, trajectory=trajectory) experiences.append(experience) - + collection.delete(unused_ids) trajectory_engine.add_objs(trajectories) experience_engine.add_objs(experiences) - async def task_code_sumarization(self, trajectory: Trajectory): """Summarize the task code Args: @@ -90,16 +92,18 @@ class AddNewExperiences(Action): A summary of the task code. """ task = trajectory.task - prompt = TASK_CODE_DESCRIPTION_PROMPT.format(code_snippet=task.code, code_result=task.result, - code_success="Success" if task.is_success else "Failure") + prompt = TASK_CODE_DESCRIPTION_PROMPT.format( + code_snippet=task.code, code_result=task.result, code_success="Success" if task.is_success else "Failure" + ) resp = await self._aask(prompt=prompt) return resp - async def run(self, - trajectory_collection_name: str=TRAJECTORY_COLLECTION_NAME, - experience_collection_name: str=EXPERIENCE_COLLECTION_NAME, - mode :str = "single_task_summary" - ): + async def run( + self, + trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME, + experience_collection_name: str = EXPERIENCE_COLLECTION_NAME, + mode: str = "single_task_summary", + ): """Initiate a collection and Add a new task experience to the collection Args: @@ -109,22 +113,23 @@ class AddNewExperiences(Action): """ if mode == "single_task_summary": - await self._single_task_summary(trajectory_collection_name=trajectory_collection_name,experience_collection_name=experience_collection_name) + await self._single_task_summary( + trajectory_collection_name=trajectory_collection_name, + experience_collection_name=experience_collection_name, + ) else: - pass # TODO:add other methods to generate experiences from trajectories - + pass # TODO:add other methods to generate experiences from trajectories + + class AddNewTrajectories(Action): name: str = "AddNewTrajectories" - def _init_engine(self,collection_name: str): - """Initialize a collection for storing code experiences - """ + def _init_engine(self, collection_name: str): + """Initialize a collection for storing code experiences""" engine = SimpleEngine.from_docs( input_files=[TRAVEL_DOC_PATH], - retriever_configs=[ChromaRetrieverConfig( - persist_path=PERSIST_PATH, - collection_name=collection_name)], + retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], ) db = chromadb.PersistentClient(path=str(PERSIST_PATH)) @@ -132,8 +137,8 @@ class AddNewTrajectories(Action): chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) return engine - - async def run(self, planner: Planner, trajectory_collection_name: str=TRAJECTORY_COLLECTION_NAME): + + async def run(self, planner: Planner, trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME): """ Initiate a collection and add new trajectories to the collection """ @@ -141,19 +146,21 @@ class AddNewTrajectories(Action): if not planner.plan.tasks: return - + user_requirement = planner.plan.goal task_map = planner.plan.task_map - trajectories = [Trajectory(user_requirement=user_requirement, task_map=task_map, task=task, is_used=False) for task in planner.plan.tasks] - - engine.add_objs(trajectories) + trajectories = [ + Trajectory(user_requirement=user_requirement, task_map=task_map, task=task, is_used=False) + for task in planner.plan.tasks + ] + engine.add_objs(trajectories) class RetrieveExperiences(Action): name: str = "RetrieveExperiences" - def _init_engine(self,collection_name: str,top_k: int): + def _init_engine(self, collection_name: str, top_k: int): """Initialize a SimpleEngine for retrieving experiences Args: @@ -163,10 +170,11 @@ class RetrieveExperiences(Action): engine = SimpleEngine.from_docs( input_files=[TRAVEL_DOC_PATH], - retriever_configs=[ChromaRetrieverConfig( - persist_path=PERSIST_PATH, - collection_name=collection_name, - similarity_top_k=top_k)], + retriever_configs=[ + ChromaRetrieverConfig( + persist_path=PERSIST_PATH, collection_name=collection_name, similarity_top_k=top_k + ) + ], ) db = chromadb.PersistentClient(path=str(PERSIST_PATH)) @@ -175,7 +183,9 @@ class RetrieveExperiences(Action): return engine - async def run(self, query: str,experience_collection_name: str=EXPERIENCE_COLLECTION_NAME, top_k: int = 5) -> str: + async def run( + self, query: str, experience_collection_name: str = EXPERIENCE_COLLECTION_NAME, top_k: int = 5 + ) -> str: """Retrieve past attempted tasks Args: @@ -186,7 +196,7 @@ class RetrieveExperiences(Action): Returns: _type_: _description_ """ - engine = self._init_engine(collection_name=experience_collection_name,top_k=top_k) + engine = self._init_engine(collection_name=experience_collection_name, top_k=top_k) if len(query) <= 2: # not "" or not '""' return "" @@ -208,7 +218,7 @@ class RetrieveExperiences(Action): "Code summary": code_summary, "Task result": trajectory.task.result, "Task outcome": "Success" if trajectory.task.is_success else "Failure", - "Task ownership's requirement": "This task is part of " + trajectory.user_requirement + "Task ownership's requirement": "This task is part of " + trajectory.user_requirement, } # Replace the placeholder in the keys diff --git a/metagpt/prompts/di/get_task_summary.py b/metagpt/prompts/di/get_task_summary.py index fcf586912..e27e889a4 100644 --- a/metagpt/prompts/di/get_task_summary.py +++ b/metagpt/prompts/di/get_task_summary.py @@ -7,4 +7,4 @@ Code Execution Result: {code_result} Code Success or Failure: {code_success} -""" \ No newline at end of file +""" diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index 61e4e2563..13e0c2df2 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -123,7 +123,9 @@ class DataInterpreter(Role): while not success and counter < max_retry: ### write code ### - code, cause_by = await self._write_code(counter, plan_status, tool_info, experiences = experiences if counter == 0 else "") + code, cause_by = await self._write_code( + counter, plan_status, tool_info, experiences=experiences if counter == 0 else "" + ) self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) @@ -144,13 +146,7 @@ class DataInterpreter(Role): return code, result, success - async def _write_code( - self, - counter: int, - plan_status: str = "", - tool_info: str = "", - experiences: str = "" - ): + async def _write_code(self, counter: int, plan_status: str = "", tool_info: str = "", experiences: str = ""): todo = self.rc.todo # todo is WriteAnalysisCode logger.info(f"ready to {todo.name}") use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial @@ -163,7 +159,7 @@ class DataInterpreter(Role): tool_info=tool_info, working_memory=self.working_memory.get(), use_reflection=use_reflection, - experiences = experiences + experiences=experiences, ) return code, todo diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index aa8d833a5..2c088eb48 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -30,12 +30,12 @@ from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validat from metagpt.actions import Action, ActionOutput from metagpt.actions.action_node import ActionNode from metagpt.actions.add_requirement import UserRequirement -from metagpt.actions.di.use_experience import RetrieveExperiences, AddNewTrajectories +from metagpt.actions.di.use_experience import AddNewTrajectories, RetrieveExperiences from metagpt.context_mixin import ContextMixin from metagpt.logs import logger from metagpt.memory import Memory from metagpt.provider import HumanProvider -from metagpt.schema import Message, MessageQueue, SerializationMixin, TaskResult, Task +from metagpt.schema import Message, MessageQueue, SerializationMixin, Task, TaskResult from metagpt.strategy.planner import Planner from metagpt.utils.common import any_to_name, any_to_str, role_raise_decorator from metagpt.utils.project_repo import ProjectRepo @@ -493,13 +493,13 @@ class Role(SerializationMixin, ContextMixin, BaseModel): # retrieve past tasks for this task experiences = await RetrieveExperiences().run(query=task.instruction) if self.use_experience else "" - + # take on current task task_result = await self._act_on_task(task, experiences) # process the result, such as reviewing, confirming, plan updating await self.planner.process_task_result(task_result) - + await AddNewTrajectories().run(self.planner) rsp = self.planner.get_useful_memories()[0] # return the completed plan as a response From 258a0894b8064824eeb315e978e6442929566984 Mon Sep 17 00:00:00 2001 From: luxiangtao Date: Tue, 16 Apr 2024 17:30:11 +0800 Subject: [PATCH 3/4] add docstring and others --- metagpt/actions/di/use_experience.py | 201 ++++++++++++++------------- metagpt/roles/di/data_interpreter.py | 8 +- metagpt/roles/role.py | 8 +- 3 files changed, 116 insertions(+), 101 deletions(-) diff --git a/metagpt/actions/di/use_experience.py b/metagpt/actions/di/use_experience.py index 2b2601e0c..7d84c6424 100644 --- a/metagpt/actions/di/use_experience.py +++ b/metagpt/actions/di/use_experience.py @@ -39,109 +39,28 @@ TRAJECTORY_COLLECTION_NAME = "di_trajectory_0" PERSIST_PATH = SERDESER_PATH / "data_interpreter/chroma" -class AddNewExperiences(Action): - name: str = "AddNewTaskExperiences" - - def _init_engine(self, collection_name: str): - """Initialize a collection for storing code experiences""" - - engine = SimpleEngine.from_docs( - input_files=[TRAVEL_DOC_PATH], - retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], - ) - - db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - chroma_collection = db.get_or_create_collection(collection_name) - chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) - - return engine - - async def _single_task_summary(self, trajectory_collection_name: str, experience_collection_name: str): - trajectory_engine = self._init_engine(collection_name=trajectory_collection_name) - experience_engine = self._init_engine(collection_name=experience_collection_name) - - db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - collection = db.get_or_create_collection(trajectory_collection_name) - - unused_ids = [ - id - for id in collection.get()["ids"] - if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"] == False - ] - trajectory_dicts = [json.loads(metadata["obj_json"]) for metadata in collection.get(unused_ids)["metadatas"]] - trajectories = [] - experiences = [] - for trajectory_dict in trajectory_dicts: - trajectory_dict["is_used"] = True - trajectory = Trajectory(**trajectory_dict) - trajectories.append(trajectory) - - code_summary = await self.task_code_sumarization(trajectory) - experience = Experience(code_summary=code_summary, trajectory=trajectory) - experiences.append(experience) - - collection.delete(unused_ids) - trajectory_engine.add_objs(trajectories) - experience_engine.add_objs(experiences) - - async def task_code_sumarization(self, trajectory: Trajectory): - """Summarize the task code - Args: - task: The task to be summarized. - Returns: - A summary of the task code. - """ - task = trajectory.task - prompt = TASK_CODE_DESCRIPTION_PROMPT.format( - code_snippet=task.code, code_result=task.result, code_success="Success" if task.is_success else "Failure" - ) - resp = await self._aask(prompt=prompt) - return resp - - async def run( - self, - trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME, - experience_collection_name: str = EXPERIENCE_COLLECTION_NAME, - mode: str = "single_task_summary", - ): - """Initiate a collection and Add a new task experience to the collection - - Args: - trajectory_collection_name(str): the trajectory collection_name to be used for geting experiences. - experience_collection_name(str): the experience collection_name to be used for saving experiences. - mode(str): how to generate experiences - - """ - if mode == "single_task_summary": - await self._single_task_summary( - trajectory_collection_name=trajectory_collection_name, - experience_collection_name=experience_collection_name, - ) - else: - pass # TODO:add other methods to generate experiences from trajectories - - class AddNewTrajectories(Action): + """Record the execution status of each task as a trajectory and store it.""" + name: str = "AddNewTrajectories" def _init_engine(self, collection_name: str): - """Initialize a collection for storing code experiences""" + """Initialize a collection for storing code experiences.""" engine = SimpleEngine.from_docs( input_files=[TRAVEL_DOC_PATH], retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], ) - + # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - chroma_collection = db.get_or_create_collection(collection_name) - chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) + chroma_collection = db.get_or_create_collection(collection_name) # get chromadb collection + chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) # delete the irrelevant record return engine async def run(self, planner: Planner, trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME): - """ - Initiate a collection and add new trajectories to the collection - """ + """Initiate a collection and add new trajectories to the collection.""" + engine = self._init_engine(trajectory_collection_name) if not planner.plan.tasks: @@ -157,14 +76,110 @@ class AddNewTrajectories(Action): engine.add_objs(trajectories) +class AddNewExperiences(Action): + """Retrieve the trajectories from the vector database where trajectories are stored, + compare and summarize them to form experiences, and then store these experiences in the vector database. + """ + + name: str = "AddNewTaskExperiences" + + def _init_engine(self, collection_name: str): + """Initialize a collection for storing code experiences.""" + + engine = SimpleEngine.from_docs( + input_files=[TRAVEL_DOC_PATH], + retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], + ) + # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + chroma_collection = db.get_or_create_collection(collection_name) + chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) + + return engine + + async def _single_task_summary(self, trajectory_collection_name: str, experience_collection_name: str): + trajectory_engine = self._init_engine(collection_name=trajectory_collection_name) + experience_engine = self._init_engine(collection_name=experience_collection_name) + + db = chromadb.PersistentClient(path=str(PERSIST_PATH)) + collection = db.get_or_create_collection(trajectory_collection_name) + + # get the ids of all trajectories where the is_used attribute is false. + unused_ids = [ + id + for id in collection.get()["ids"] # collection.get()["ids"] will get all the ids in the collection + if json.loads(collection.get([id])["metadatas"][0]["obj_json"])["is_used"] + == False # Check if the is_used attribute of the trajectory corresponding to the given id is false. + ] + + trajectory_dicts = [ + json.loads(metadata["obj_json"]) for metadata in collection.get(unused_ids)["metadatas"] + ] # get the trajectory in dictionary format + trajectories = [] + experiences = [] + + for trajectory_dict in trajectory_dicts: + # set the is_used attribute of the trajectory to true and create a new trajectory (the old trajectory will be deleted below). + trajectory_dict["is_used"] = True + trajectory = Trajectory(**trajectory_dict) + trajectories.append(trajectory) + + # summarize the trajectory using LLM and assemble it into a single experience + code_summary = await self.task_code_sumarization(trajectory) + experience = Experience(code_summary=code_summary, trajectory=trajectory) + experiences.append(experience) + + collection.delete(unused_ids) # delete the old trajectories + trajectory_engine.add_objs(trajectories) + experience_engine.add_objs(experiences) + + async def task_code_sumarization(self, trajectory: Trajectory): + """use LLM to summarize the task code. + Args: + trajectory: The trajectory to be summarized. + Returns: + A summary of the trajectory's code. + """ + task = trajectory.task + prompt = TASK_CODE_DESCRIPTION_PROMPT.format( + code_snippet=task.code, code_result=task.result, code_success="Success" if task.is_success else "Failure" + ) + resp = await self._aask(prompt=prompt) + return resp + + async def run( + self, + trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME, + experience_collection_name: str = EXPERIENCE_COLLECTION_NAME, + mode: str = "single_task_summary", + ): + """Initiate a collection and Add a new task experience to the collection. + + Args: + trajectory_collection_name(str): the trajectory collection_name to be used for geting experiences. + experience_collection_name(str): the experience collection_name to be used for saving experiences. + mode(str): how to generate experiences. + + """ + if mode == "single_task_summary": + await self._single_task_summary( + trajectory_collection_name=trajectory_collection_name, + experience_collection_name=experience_collection_name, + ) + else: + pass # TODO:add other methods to generate experiences from trajectories. + + class RetrieveExperiences(Action): + """Retrieve the most relevant experience from the vector database based on the input task.""" + name: str = "RetrieveExperiences" def _init_engine(self, collection_name: str, top_k: int): - """Initialize a SimpleEngine for retrieving experiences + """Initialize a SimpleEngine for retrieving experiences. Args: - query (str): The chromadb collectin_name + query (str): The chromadb collectin_name. top_k (int): The number of eperiences to be retrieved. """ @@ -176,7 +191,7 @@ class RetrieveExperiences(Action): ) ], ) - + # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. db = chromadb.PersistentClient(path=str(PERSIST_PATH)) chroma_collection = db.get_or_create_collection(collection_name) chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) @@ -190,7 +205,7 @@ class RetrieveExperiences(Action): Args: query (str): The task instruction to be used for retrieval. - experience_collection_name(str): the collextion_name for retrieving experiences + experience_collection_name(str): the collextion_name for retrieving experiences. top_k (int, optional): The number of experiences to be retrieved. Defaults to 5. Returns: diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index 13e0c2df2..4326c60db 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -7,6 +7,7 @@ from pydantic import Field, model_validator from metagpt.actions.di.ask_review import ReviewConst from metagpt.actions.di.execute_nb_code import ExecuteNbCode +from metagpt.actions.di.use_experience import AddNewTrajectories, RetrieveExperiences from metagpt.actions.di.write_analysis_code import CheckData, WriteAnalysisCode from metagpt.logs import logger from metagpt.prompts.di.write_analysis_code import DATA_INFO @@ -89,14 +90,19 @@ class DataInterpreter(Role): async def _plan_and_act(self) -> Message: try: rsp = await super()._plan_and_act() + await AddNewTrajectories().run( + self.planner + ) # extract trajectories based on the execution status of each task in the planner await self.execute_code.terminate() return rsp except Exception as e: await self.execute_code.terminate() raise e - async def _act_on_task(self, current_task: Task, experiences: str) -> TaskResult: + async def _act_on_task(self, current_task: Task) -> TaskResult: """Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation.""" + # retrieve past tasks for this task + experiences = await RetrieveExperiences().run(query=current_task.instruction) if self.use_experience else "" code, result, is_success = await self._write_and_exec_code(experiences=experiences) task_result = TaskResult(code=code, result=result, is_success=is_success) return task_result diff --git a/metagpt/roles/role.py b/metagpt/roles/role.py index 2c088eb48..80c267315 100644 --- a/metagpt/roles/role.py +++ b/metagpt/roles/role.py @@ -30,7 +30,6 @@ from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validat from metagpt.actions import Action, ActionOutput from metagpt.actions.action_node import ActionNode from metagpt.actions.add_requirement import UserRequirement -from metagpt.actions.di.use_experience import AddNewTrajectories, RetrieveExperiences from metagpt.context_mixin import ContextMixin from metagpt.logs import logger from metagpt.memory import Memory @@ -491,17 +490,12 @@ class Role(SerializationMixin, ContextMixin, BaseModel): task = self.planner.current_task logger.info(f"ready to take on task {task}") - # retrieve past tasks for this task - experiences = await RetrieveExperiences().run(query=task.instruction) if self.use_experience else "" - # take on current task - task_result = await self._act_on_task(task, experiences) + task_result = await self._act_on_task(task) # process the result, such as reviewing, confirming, plan updating await self.planner.process_task_result(task_result) - await AddNewTrajectories().run(self.planner) - rsp = self.planner.get_useful_memories()[0] # return the completed plan as a response self.rc.memory.add(rsp) # add to persistent memory From 200d47a5c0fd570710af5312f695da52cef2a2f1 Mon Sep 17 00:00:00 2001 From: luxiangtao Date: Tue, 16 Apr 2024 20:09:11 +0800 Subject: [PATCH 4/4] modify --- metagpt/actions/di/use_experience.py | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/metagpt/actions/di/use_experience.py b/metagpt/actions/di/use_experience.py index 7d84c6424..ddc7f6547 100644 --- a/metagpt/actions/di/use_experience.py +++ b/metagpt/actions/di/use_experience.py @@ -3,7 +3,6 @@ import json import chromadb from pydantic import BaseModel -from examples.rag_pipeline import TRAVEL_DOC_PATH from metagpt.actions import Action from metagpt.const import SERDESER_PATH from metagpt.logs import logger @@ -47,15 +46,9 @@ class AddNewTrajectories(Action): def _init_engine(self, collection_name: str): """Initialize a collection for storing code experiences.""" - engine = SimpleEngine.from_docs( - input_files=[TRAVEL_DOC_PATH], + engine = SimpleEngine.from_objs( retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], ) - # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. - db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - chroma_collection = db.get_or_create_collection(collection_name) # get chromadb collection - chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) # delete the irrelevant record - return engine async def run(self, planner: Planner, trajectory_collection_name: str = TRAJECTORY_COLLECTION_NAME): @@ -86,15 +79,9 @@ class AddNewExperiences(Action): def _init_engine(self, collection_name: str): """Initialize a collection for storing code experiences.""" - engine = SimpleEngine.from_docs( - input_files=[TRAVEL_DOC_PATH], + engine = SimpleEngine.from_objs( retriever_configs=[ChromaRetrieverConfig(persist_path=PERSIST_PATH, collection_name=collection_name)], ) - # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. - db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - chroma_collection = db.get_or_create_collection(collection_name) - chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) - return engine async def _single_task_summary(self, trajectory_collection_name: str, experience_collection_name: str): @@ -183,19 +170,13 @@ class RetrieveExperiences(Action): top_k (int): The number of eperiences to be retrieved. """ - engine = SimpleEngine.from_docs( - input_files=[TRAVEL_DOC_PATH], + engine = SimpleEngine.from_objs( retriever_configs=[ ChromaRetrieverConfig( persist_path=PERSIST_PATH, collection_name=collection_name, similarity_top_k=top_k ) ], ) - # due to an irrelevant record being added to the vector database when loading from SimpleEngine.from_docs(), it is necessary to remove it. - db = chromadb.PersistentClient(path=str(PERSIST_PATH)) - chroma_collection = db.get_or_create_collection(collection_name) - chroma_collection.delete(where_document={"$contains": "Bob likes traveling"}) - return engine async def run(