diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index a8534b710..16875a42e 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -43,66 +43,66 @@ class DataInterpreter(Role): tool_recommender: ToolRecommender = None react_mode: Literal["plan_and_act", "react"] = "plan_and_act" max_react_loop: int = 10 # used for react mode - + @model_validator(mode="after") def set_plan_and_tool(self) -> "Interpreter": self._set_react_mode(react_mode=self.react_mode, max_react_loop=self.max_react_loop, auto_run=self.auto_run) self.use_plan = ( - self.react_mode == "plan_and_act" + self.react_mode == "plan_and_act" ) # create a flag for convenience, overwrite any passed-in value if self.tools: self.tool_recommender = BM25ToolRecommender(tools=self.tools) self.set_actions([WriteAnalysisCode]) self._set_state(0) return self - + @property def working_memory(self): return self.rc.working_memory - + async def _think(self) -> bool: """Useful in 'react' mode. Use LLM to decide whether and what to do next.""" user_requirement = self.get_memories()[0].content context = self.working_memory.get() - + if not context: # just started the run, we need action certainly self.working_memory.add(self.get_memories()[0]) # add user requirement to working memory self._set_state(0) return True - + prompt = REACT_THINK_PROMPT.format(user_requirement=user_requirement, context=context) rsp = await self.llm.aask(prompt) rsp_dict = json.loads(CodeParser.parse_code(block=None, text=rsp)) self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant")) need_action = rsp_dict["state"] self._set_state(0) if need_action else self._set_state(-1) - + return need_action - + async def _act(self) -> Message: """Useful in 'react' mode. Return a Message conforming to Role._act interface.""" code, _, _ = await self._write_and_exec_code() return Message(content=code, role="assistant", cause_by=WriteAnalysisCode) - + async def _plan_and_act(self) -> Message: rsp = await super()._plan_and_act() await self.execute_code.terminate() return rsp - + async def _act_on_task(self, current_task: Task) -> TaskResult: """Useful in 'plan_and_act' mode. Wrap the output in a TaskResult for review and confirmation.""" code, result, is_success = await self._write_and_exec_code() task_result = TaskResult(code=code, result=result, is_success=is_success) return task_result - + async def _write_and_exec_code(self, max_retry: int = 3): counter = 0 success = False - + # plan info plan_status = self.planner.get_plan_status() if self.use_plan else "" - + # tool info if self.tools: context = ( @@ -112,45 +112,46 @@ class DataInterpreter(Role): tool_info = await self.tool_recommender.get_recommended_tool_info(context=context, plan=plan) else: tool_info = "" - + # data info await self._check_data() - + while not success and counter < max_retry: ### write code ### code, cause_by = await self._write_code(counter, plan_status, tool_info) - + self.working_memory.add(Message(content=code, role="assistant", cause_by=cause_by)) - + ### execute code ### + import pdb;pdb.set_trace() result, success = await self.execute_code.run(code) print(result) - + self.working_memory.add(Message(content=result, role="user", cause_by=ExecuteNbCode)) - + ### process execution result ### counter += 1 - + if not success and counter >= max_retry: logger.info("coding failed!") review, _ = await self.planner.ask_review(auto_run=False, trigger=ReviewConst.CODE_REVIEW_TRIGGER) if ReviewConst.CHANGE_WORDS[0] in review: counter = 0 # redo the task again with help of human suggestions - + return code, result, success - + async def _write_code( - self, - counter: int, - plan_status: str = "", - tool_info: str = "", + self, + counter: int, + plan_status: str = "", + tool_info: str = "", ): todo = self.rc.todo # todo is WriteAnalysisCode logger.info(f"ready to {todo.name}") use_reflection = counter > 0 and self.use_reflection # only use reflection after the first trial - + user_requirement = self.get_memories()[0].content - + code = await todo.run( user_requirement=user_requirement, plan_status=plan_status, @@ -158,19 +159,19 @@ class DataInterpreter(Role): working_memory=self.working_memory.get(), use_reflection=use_reflection, ) - + return code, todo - + async def _check_data(self): if ( - not self.use_plan - or not self.planner.plan.get_finished_tasks() - or self.planner.plan.current_task.task_type - not in [ - TaskType.DATA_PREPROCESS.type_name, - TaskType.FEATURE_ENGINEERING.type_name, - TaskType.MODEL_TRAIN.type_name, - ] + not self.use_plan + or not self.planner.plan.get_finished_tasks() + or self.planner.plan.current_task.task_type + not in [ + TaskType.DATA_PREPROCESS.type_name, + TaskType.FEATURE_ENGINEERING.type_name, + TaskType.MODEL_TRAIN.type_name, + ] ): return logger.info("Check updated data") @@ -182,3 +183,6 @@ class DataInterpreter(Role): print(result) data_info = DATA_INFO.format(info=result) self.working_memory.add(Message(content=data_info, role="user", cause_by=CheckData)) + + def get_last_cell_source(self): + return self.execute_code.nb.cells[-1].source