diff --git a/examples/andriod_assistant/actions/screenshot_parse.py b/examples/andriod_assistant/actions/screenshot_parse.py index ef9454979..800cc74b9 100644 --- a/examples/andriod_assistant/actions/screenshot_parse.py +++ b/examples/andriod_assistant/actions/screenshot_parse.py @@ -8,7 +8,7 @@ from examples.andriod_assistant.prompts.assistant_prompt import ( screenshot_parse_template, screenshot_parse_with_grid_template, ) -from examples.andriod_assistant.utils.schema import OpLogItem, ActionOp, ParamExtState, GridOp, ActionOp, TapOp, TapGridOp, \ +from examples.andriod_assistant.utils.schema import OpLogItem, ParamExtState, GridOp, TapOp, TapGridOp, \ LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, AndroidElement from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, area_to_xy, screenshot_parse_extract, elem_bbox_to_xy diff --git a/examples/andriod_assistant/actions/self_learn.py b/examples/andriod_assistant/actions/self_learn.py deleted file mode 100644 index 8ffc14c42..000000000 --- a/examples/andriod_assistant/actions/self_learn.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage - -from pathlib import Path - -from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE -from examples.andriod_assistant.prompts.assistant_prompt import ( - screenshot_parse_self_explore_template, -) -from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree -from metagpt.actions.action import Action -from metagpt.config2 import config -from metagpt.environment.android_env.android_env import AndroidEnv -from metagpt.environment.api.env_api import EnvAPIAbstract -from metagpt.utils.common import encode_image - - -class SelfLearn(Action): - name: str = "SelfLearn" - - useless_list: list[str] = [] # store useless elements uid - - async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv): - screenshot_path: Path = env.step( - EnvAPIAbstract( - api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} - ) - ) - xml_path: Path = env.step( - EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir}) - ) - if not screenshot_path.exists() or not xml_path.exists(): - # TODO exit - return - - clickable_list = [] - focusable_list = [] - traverse_xml_tree(xml_path, clickable_list, "clickable", True) - traverse_xml_tree(xml_path, focusable_list, "focusable", True) - elem_list = [] - for elem in clickable_list: - if elem.uid in self.useless_list: - continue - elem_list.append(elem) - for elem in focusable_list: - if elem.uid in self.useless_list: - continue - bbox = elem.bbox - center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 - close = False - for e in clickable_list: - bbox = e.bbox - center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 - dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 - if dist <= config.get_other("min_dist"): - close = True - break - if not close: - elem_list.append(elem) - draw_bbox_multi(screenshot_path, task_dir.joinpath(f"{round_count}_before_labeled.png"), elem_list) - img_base64 = encode_image(task_dir.joinpath(f"{round_count}_before_labeled.png")) - - self_explore_template = screenshot_parse_self_explore_template - context = self_explore_template.format(task_description=task_desc, last_act=last_act) - - node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64]) diff --git a/examples/andriod_assistant/actions/self_learn_and_reflect.py b/examples/andriod_assistant/actions/self_learn_and_reflect.py new file mode 100644 index 000000000..ef9fcf067 --- /dev/null +++ b/examples/andriod_assistant/actions/self_learn_and_reflect.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage + +from pathlib import Path +import ast + +from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE +from examples.andriod_assistant.actions.self_learn_reflect_an import SELF_LEARN_REFLECT_NODE +from examples.andriod_assistant.prompts.assistant_prompt import ( + screenshot_parse_self_explore_template, screenshot_parse_self_explore_reflect_template as reflect_template +) +from examples.andriod_assistant.utils.schema import AndroidElement, OpLogItem, ReflectLogItem, ParamExtState, TapOp, TextOp, SwipeOp, LongPressOp, ActionOp, Decision, DocContent +from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, screenshot_parse_extract, elem_bbox_to_xy, reflect_parse_extarct +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.environment.api.env_api import EnvAPIAbstract +from metagpt.utils.common import encode_image +from metagpt.const import ADB_EXEC_FAIL +from metagpt.logs import logger + + +class SelfLearnAndReflect(Action): + name: str = "SelfLearnAndReflect" + + useless_list: list[str] = [] # store useless elements uid + + screenshot_before_path: str = "" + screenshot_before_base64: str = "" + elem_list: list[AndroidElement] = [] + swipe_orient: str = "up" + act_name: str = "" + ui_area: int = -1 + + async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv): + self.run_self_learn(round_count, task_desc, last_act, task_dir, env) + self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env) + + async def run_self_learn(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv): + screenshot_path: Path = env.step( + EnvAPIAbstract( + api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} + ) + ) + xml_path: Path = env.step( + EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir}) + ) + if not screenshot_path.exists() or not xml_path.exists(): + # TODO exit + return + + clickable_list = [] + focusable_list = [] + traverse_xml_tree(xml_path, clickable_list, "clickable", True) + traverse_xml_tree(xml_path, focusable_list, "focusable", True) + elem_list = [] + for elem in clickable_list: + if elem.uid in self.useless_list: + continue + elem_list.append(elem) + for elem in focusable_list: + if elem.uid in self.useless_list: + continue + bbox = elem.bbox + center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + close = False + for e in clickable_list: + bbox = e.bbox + center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 + if dist <= config.get_other("min_dist"): + close = True + break + if not close: + elem_list.append(elem) + screenshot_before_labeled_path = task_dir.joinpath(f"{round_count}_before_labeled.png") + draw_bbox_multi(screenshot_path, screenshot_before_labeled_path, elem_list) + img_base64 = encode_image(screenshot_before_labeled_path) + self.screenshot_before_base64 = img_base64 + self.screenshot_before_path = screenshot_before_labeled_path + + self_explore_template = screenshot_parse_self_explore_template + context = self_explore_template.format(task_description=task_desc, last_act=last_act) + + node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64]) + if "error" in node.content: + # TODO + return + prompt = node.compile(context=context, schema="json", mode="auto") + log_item = OpLogItem(step=round_count, prompt=prompt, image=screenshot_before_labeled_path, response=node.content) + op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on=False) + if op_param.param_state == ParamExtState.FINISH: + # TODO + return + if op_param.param_state == ParamExtState.FAIL: + # TODO + return + + if isinstance(op_param, TapOp): + self.ui_area = op_param.area + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = env.step(EnvAPIAbstract("system_tap", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + # TODO + return + elif isinstance(op_param, TextOp): + res = env.step(EnvAPIAbstract("user_input", kwargs={"input_txt": op_param.input_str})) + if res == ADB_EXEC_FAIL: + # TODO + return + elif isinstance(op_param, LongPressOp): + self.ui_area = op_param.area + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = env.step(EnvAPIAbstract("user_longpress", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + # TODO + return + elif isinstance(op_param, SwipeOp): + self.ui_area = op_param.area + self.swipe_orient = op_param.swipe_orient + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist})) + if res == ADB_EXEC_FAIL: + # TODO + return + + self.elem_list = elem_list + self.act_name = op_param.act_name + + async def run_reflect(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv): + screenshot_path: Path = env.step( + EnvAPIAbstract( + api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_after", "local_save_dir": task_dir} + ) + ) + if not screenshot_path.exists(): + # TODO + return + + screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png") + draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list) + img_base64 = encode_image(screenshot_after_labeled_path) + + if self.act_name == ActionOp.TAP.value: + action = "tapping" + elif self.act_name == ActionOp.LONG_PRESS.value: + action = "long pressing" + elif self.act_name == ActionOp.SWIPE.value: + action = "swiping" + if self.swipe_orient == SwipeOp.UP.value or self.swipe_orient == SwipeOp.DOWN.value: + action = "v_swipe" + elif self.swipe_orient == SwipeOp.LEFT.value or self.swipe_orient == SwipeOp.RIGHT.value: + action = "h_swipe" + context = reflect_template.format(action=action, ui_element=str(self.ui_area), task_desc=task_desc, last_act=last_act) + node = await SELF_LEARN_REFLECT_NODE.fill(context=context, llm=self.llm, images=[self.screenshot_before_base64, img_base64]) + + if "error" in node.content: + # TODO + return + + prompt = node.compile(context=context, schema="json", mode="auto") + log_item = ReflectLogItem(step=round_count, prompt=prompt, image_before=self.screenshot_before_path, + image_after=screenshot_after_labeled_path, response=node.content) + + op_param = reflect_parse_extarct(node.instruct_content.model_dump()) + if op_param.param_state == ParamExtState.FINISH: + # TODO + return + if op_param.param_state == ParamExtState.FAIL: + # TODO + return + + resource_id = self.elem_list[int(self.ui_area) -1].uid + if op_param.decision == Decision.INEFFECTIVE.value: + self.useless_list.append(resource_id) + last_act = "NONE" # TODO global + elif op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value, Decision.SUCCESS.value]: + if op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value]: + self.useless_list.append(resource_id) + last_act = "NONE" + if op_param.decision == Decision.BACK.value: + res = env.step(EnvAPIAbstract("system_back")) + if res == ADB_EXEC_FAIL: + # TODO + return + doc = op_param.documentation + doc_path = docs_dir.joinpath(f"{resource_id}.txt") + if doc_path.exists(): + doc_content = ast.literal_eval(open(doc_path).read()) + if doc_content[self.act_name]: + logger.info(f"Documentation for the element {resource_id} already exists.") + # TODO + return + else: + doc_content = DocContent() + setattr(doc_content, self.act_name, doc) + doc_path.write_text(str(doc_content)) diff --git a/examples/andriod_assistant/actions/self_learn_reflect.py b/examples/andriod_assistant/actions/self_learn_reflect.py deleted file mode 100644 index c8c78d8fc..000000000 --- a/examples/andriod_assistant/actions/self_learn_reflect.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# @Desc : LIKE scripts/self_explorer.py self_explore_reflect stage - -from pathlib import Path - -from examples.andriod_assistant.prompts.assistant_prompt import ( - screenshot_parse_self_explore_reflect_template, -) -from examples.andriod_assistant.utils.schema import AndroidElement, ActionOp, SwipeOp -from examples.andriod_assistant.utils.utils import draw_bbox_multi -from metagpt.actions.action import Action -from metagpt.environment.android_env.android_env import AndroidEnv -from metagpt.environment.api.env_api import EnvAPIAbstract -from metagpt.utils.common import encode_image - - -class SelfLearnReflect(Action): - name: str = "SelfLearnReflect" - - async def run( - self, - round_count: int, - task_desc: str, - last_act: str, - task_dir: Path, - env: AndroidEnv, - elem_list: list[AndroidElement], - act_name: str, - swipe_orient: str, - ui_area: int, - ): - if act_name == "text": - # TODO ignore current reflect - return - - screenshot_path: Path = env.step( - EnvAPIAbstract( - api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} - ) - ) - if not screenshot_path.exists(): - # TODO exit - return - - draw_bbox_multi(screenshot_path, task_dir.joinpath(f"{round_count}_after_labeled.png"), elem_list) - encode_image(task_dir.joinpath(f"{round_count}_after_labeled.png")) - - reflect_template = screenshot_parse_self_explore_reflect_template - if act_name == ActionOp.TAP.value: - action = "tapping" - elif act_name == ActionOp.LONG_PRESS.value: - action = "long pressing" - elif act_name == ActionOp.SWIPE.value: - action = "swiping" - if swipe_orient == SwipeOp.UP.value or swipe_orient == SwipeOp.DOWN.value: - action = "v_swipe" - elif swipe_orient == SwipeOp.LEFT.value or swipe_orient == SwipeOp.RIGHT.value: - action = "h_swipe" - - reflect_template.format(action=action, ui_element=str(ui_area), task_desc=task_desc, last_act=last_act) diff --git a/examples/andriod_assistant/actions/self_learn_reflect_an.py b/examples/andriod_assistant/actions/self_learn_reflect_an.py new file mode 100644 index 000000000..b70c65655 --- /dev/null +++ b/examples/andriod_assistant/actions/self_learn_reflect_an.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the ActionNode to parse Reflection + +from metagpt.actions.action_node import ActionNode + + +DECISION = ActionNode( + key="Decision", + expected_type=str, + instruction="explain why you made this decision", + example="BACK" +) + + +THOUGHT = ActionNode( + key="Thought", + expected_type=str, + instruction="explain why you made this decision", + example="" +) + + +DOCUMENTATION = ActionNode( + key="Documentation", + expected_type=str, + instruction="describe the function of the UI element", + example="" +) + + +NODES = [DECISION, THOUGHT, DOCUMENTATION] +SELF_LEARN_REFLECT_NODE = ActionNode.from_children("SelfLearnReflect", NODES) diff --git a/examples/andriod_assistant/prompts/assistant_prompt.py b/examples/andriod_assistant/prompts/assistant_prompt.py index 068f78f3f..c0dc6f22f 100644 --- a/examples/andriod_assistant/prompts/assistant_prompt.py +++ b/examples/andriod_assistant/prompts/assistant_prompt.py @@ -145,6 +145,7 @@ should continue to interact with other elements on the screen. Notice that if yo changed between the two screenshots, then they are not identical. Your output should be in the following format: Decision: INEFFECTIVE Thought: +Documentation: 3. CONTINUE If you find the action changed something on the screen but does not reflect the action description above and did not move the given task forward, you should continue to interact with other elements on the screen. At the same time, diff --git a/examples/andriod_assistant/roles/android_assistant.py b/examples/andriod_assistant/roles/android_assistant.py index 9e9a22b0d..c67905418 100644 --- a/examples/andriod_assistant/roles/android_assistant.py +++ b/examples/andriod_assistant/roles/android_assistant.py @@ -2,43 +2,62 @@ # -*- coding: utf-8 -*- # @Desc : android assistant to learn from app operations and operate apps +from typing import Optional +from pathlib import Path +from pydantic import Field + from examples.andriod_assistant.actions.manual_record import ManualRecord from examples.andriod_assistant.actions.parse_record import ParseRecord from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse -from examples.andriod_assistant.actions.self_learn import SelfLearn +from examples.andriod_assistant.actions.self_learn_and_reflect import SelfLearnAndReflect from examples.andriod_assistant.actions.self_learn_reflect import SelfLearnReflect from metagpt.actions.add_requirement import UserRequirement from metagpt.config2 import config from metagpt.logs import logger -from metagpt.roles.role import Role +from metagpt.roles.role import Role, RoleReactMode from metagpt.schema import Message class AndroidAssistant(Role): name: str = "Nick" profile: str = "AndroidAssistant" - goal: str = "operate the phone apps with self-learn" + goal: str = "operate the mobile phone's apps with self-learn" + + task_desc: str = "" + round_count: int = 0 + last_act: str = "" + task_dir: Optional[Path] = Field(default=None) def __init__(self, **data): super().__init__(**data) self._watch([UserRequirement]) - self.set_actions([ManualRecord, ParseRecord, SelfLearn, SelfLearnReflect, ScreenshotParse]) - async def _think(self) -> bool: """Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app, run the learn first and then do the act stage or learn it during the action. """ if config.get_other("stage") == "learn" and config.get_other("mode") == "manual": # choose ManualRecord and then run ParseRecord # Remember, only run each action only one time, no need to run n_round. - pass + self.set_actions([ManualRecord, ParseRecord]) elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto": - # choose SelfLearn / SelfLearnReflect to run - pass + # choose SelfLearnAndReflect / SelfLearnReflect to run + self.set_actions([SelfLearnAndReflect, SelfLearnReflect]) elif config.get_other("stage") == "act": # choose ScreenshotParse to run - pass + self.set_actions([ScreenshotParse]) + self._set_react_mode(RoleReactMode.BY_ORDER) + + async def react(self) -> Message: + self.round_count += 1 + super().react() + + async def _think(self) -> bool: + """Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app, + run the learn first and then do the act stage or learn it during the action. + """ + pass async def _act(self) -> Message: logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") + todo = self.rc.todo diff --git a/examples/andriod_assistant/utils/schema.py b/examples/andriod_assistant/utils/schema.py index fdd456ff6..b045aa555 100644 --- a/examples/andriod_assistant/utils/schema.py +++ b/examples/andriod_assistant/utils/schema.py @@ -22,6 +22,17 @@ class SwipeOp(Enum): RIGHT = "right" +class Decision(Enum): + BACK = "BACK" + INEFFECTIVE = "INEFFECTIVE" + CONTINUE = "CONTINUE" + SUCCESS = "SUCCESS" + + @classmethod + def values(cls): + return [item.value for item in cls] + + class AndroidElement(BaseModel): """UI Element""" uid: str = Field(default="") @@ -115,3 +126,10 @@ class SwipeGridOp(BaseGridOpParam): end_subarea: str = Field(default="") # end =================== define different Action Op and its params ============= + + +class ReflectOp(BaseModel): + decision: str = "" + thought: str = "" + documentation: str = "" + param_state: ParamExtState = ParamExtState.SUCCESS diff --git a/examples/andriod_assistant/utils/utils.py b/examples/andriod_assistant/utils/utils.py index 53973cfdd..85f52d0db 100644 --- a/examples/andriod_assistant/utils/utils.py +++ b/examples/andriod_assistant/utils/utils.py @@ -14,7 +14,7 @@ from metagpt.logs import logger from examples.andriod_assistant.utils.schema import AndroidElement from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \ - LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, ParamExtState + LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, ParamExtState, ReflectOp, Decision def get_id_from_element(elem: Element) -> str: @@ -167,6 +167,17 @@ def elem_bbox_to_xy(bbox: tuple[tuple[int, int]]) -> tuple[int, int]: return x, y +def reflect_parse_extarct(parsed_json: dict) -> ReflectOp: + decision = parsed_json.get("Decision") + if decision not in Decision.values(): + op = ReflectOp(param_state=ParamExtState.FAIL) + else: + op = ReflectOp(decision=parsed_json.get("Decision"), + thought=parsed_json.get("Thought"), + documentation=parsed_json.get("Documentation")) + return op + + def screenshot_parse_extract(parsed_json: dict, grid_on: bool = False) -> Union[BaseOpParam, BaseGridOpParam, GridOp]: act = parsed_json.get("Action") last_act = parsed_json.get("Summary")