From a15e7af5a8de71b050ea249927e2a34fd4c066db Mon Sep 17 00:00:00 2001 From: Jiayi Zhang Date: Fri, 23 Feb 2024 21:19:49 +0800 Subject: [PATCH] Update mannual record action node Modify Schema SwipeOp to SwipeOp_3, this variable need a better name. --- .../actions/manual_record.py | 111 +++++++++++------- .../actions/self_learn_and_reflect.py | 5 - examples/andriod_assistant/test_for_an.py | 20 +++- examples/andriod_assistant/utils/schema.py | 4 +- examples/andriod_assistant/utils/utils.py | 4 +- 5 files changed, 92 insertions(+), 52 deletions(-) diff --git a/examples/andriod_assistant/actions/manual_record.py b/examples/andriod_assistant/actions/manual_record.py index 654ac6ea7..c281968f2 100644 --- a/examples/andriod_assistant/actions/manual_record.py +++ b/examples/andriod_assistant/actions/manual_record.py @@ -8,8 +8,10 @@ import cv2 from examples.andriod_assistant.utils.schema import ( ActionOp, + AndroidActionOutput, AndroidElement, - SwipeOp, + RunState, + SwipeOp ) from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree from metagpt.actions.action import Action @@ -22,33 +24,54 @@ from metagpt.logs import logger class ManualRecord(Action): """do a human operation on the screen with human input""" - name: str = "ManualRecord" - async def run(self, demo_name: str, task_dir: Path, env: AndroidEnv): - # Question 这里是将通过ADB获取的东西存到本地的路径的吧 - screenshot_path: Path = env.step( - EnvAPIAbstract(api_name="get_screenshot", kwargs={"ss_name": f"{demo_name}", "local_save_dir": task_dir}) - ) - xml_path: Path = env.step( - EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{demo_name}", "local_save_dir": task_dir}) - ) - if not screenshot_path.exists() or not xml_path.exists(): - # TODO exit - return + useless_list: list[str] = [] # store useless elements uid + record_path: str = "" + task_desc_path: str = "" + screenshot_before_path: str = "" + screenshot_after_path: str = "" + xml_path: str = "" + + async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv): + + self.record_path = Path(task_dir) / "record.txt" + record_file = open(self.record_path, "w") + self.task_desc_path = Path(task_dir) / "task_desc.txt" + with open(self.task_desc_path, "w") as f: + f.write(task_desc) + self.screenshot_before_path = Path(task_dir)/"raw_screenshots" + self.screenshot_after_path = Path(task_dir)/"labeled_screenshots" + self.xml_path = Path(task_dir)/"xml" step = 0 - record_path = Path(task_dir) / "record.txt" - record_file = open(record_path, "w") while True: - # TODO Parse Record Step 是否可以从这个函数中获取,进行参数的传递 ? step += 1 + screenshot_path: Path = env.observe( + EnvAPIAbstract( + api_name="get_screenshot", + kwargs={"ss_name": f"{demo_name}_{step}", "local_save_dir": self.screenshot_before_path} + ) + ) + xml_path: Path = env.observe( + EnvAPIAbstract( + api_name="get_xml", + kwargs={"xml_name": f"{demo_name}_{step}", "local_save_dir": self.xml_path} + ) + ) + if not screenshot_path.exists() or not xml_path.exists(): + return AndroidActionOutput(action_state=RunState.FAIL) clickable_list = [] focusable_list = [] traverse_xml_tree(xml_path, clickable_list, "clickable", True) traverse_xml_tree(xml_path, focusable_list, "focusable", True) - elem_list: list[AndroidElement] = clickable_list.copy() - + elem_list = [] + for elem in clickable_list: + if elem.uid in self.useless_list: + continue + elem_list.append(elem) for elem in focusable_list: + if elem.uid in self.useless_list: + continue bbox = elem.bbox center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 close = False @@ -56,12 +79,14 @@ class ManualRecord(Action): bbox = e.bbox center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 - if dist <= config.get_other("min_dist"): + # TODO Modify config to default 30. It should be modified back config after single action test + # if dist <= config.get_other("min_dist"): + if dist <= 30: close = True break if not close: elem_list.append(elem) - screenshot_labeled_path = task_dir.joinpath(f"{task_dir}_{step}_labeled.png") + screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{demo_name}_{step}_labeled.png") labeled_img = draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list) cv2.imshow("image", labeled_img) @@ -70,17 +95,17 @@ class ManualRecord(Action): user_input = "xxx" logger.info( - "Choose one of the following actions you want to perform on the current screen:\ntap, text, long " - "press, swipe, stop", + "Choose one of the following actions you want to perform on the current screen:\ntap, text, long_press," + "swipe, stop", "blue", ) while ( - user_input.lower() != ActionOp.TAP.value - and user_input.lower() != ActionOp.TEXT.value - and user_input.lower() != ActionOp.LONG_PRESS.value - and user_input.lower() != ActionOp.SWIPE.value - and user_input.lower() != ActionOp.STOP.value + user_input.lower() != ActionOp.TAP.value + and user_input.lower() != ActionOp.TEXT.value + and user_input.lower() != ActionOp.LONG_PRESS.value + and user_input.lower() != ActionOp.SWIPE.value + and user_input.lower() != ActionOp.STOP.value ): user_input = input() @@ -93,11 +118,9 @@ class ManualRecord(Action): user_input = input() tl, br = elem_list[int(user_input) - 1].bbox x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 - ret = env.step(EnvAPIAbstract(api_name="user_tap", kwargs={"x": x, "y": y})) - # Question 将 ERROR 替换为 ADB_EXEC_FAIL(FAILED) + ret = env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y})) if ret == ADB_EXEC_FAIL: - logger.info("ERROR: tap execution failed", "red") - break + return AndroidActionOutput(action_state=RunState.FAIL) record_file.write(f"tap({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n") elif user_input.lower() == ActionOp.TEXT.value: logger.info( @@ -123,10 +146,9 @@ class ManualRecord(Action): user_input = input() tl, br = elem_list[int(user_input) - 1].bbox x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 - env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) + ret = env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) if ret == ADB_EXEC_FAIL: - logger.info("ERROR: long press execution failed", "red") - break + return AndroidActionOutput(action_state=RunState.FAIL) record_file.write(f"long_press({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n") elif user_input.lower() == ActionOp.SWIPE.value: logger.info( @@ -136,10 +158,10 @@ class ManualRecord(Action): ) user_input = "" while ( - user_input != SwipeOp.UP.value - and user_input != SwipeOp.DOWN.value - and user_input != SwipeOp.LEFT.value - and user_input != SwipeOp.RIGHT.value + user_input != SwipeOp.UP.value + and user_input != SwipeOp.DOWN.value + and user_input != SwipeOp.LEFT.value + and user_input != SwipeOp.RIGHT.value ): user_input = input() swipe_dir = user_input @@ -148,10 +170,9 @@ class ManualRecord(Action): user_input = input() tl, br = elem_list[int(user_input) - 1].bbox x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 - ret = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir})) + ret = env.step(EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir})) if ret == ADB_EXEC_FAIL: - logger.info("ERROR: swipe execution failed", "red") - break + return AndroidActionOutput(action_state=RunState.FAIL) record_file.write(f"swipe({int(user_input)}:sep:{swipe_dir}):::{elem_list[int(user_input) - 1].uid}\n") elif user_input.lower() == ActionOp.STOP.value: record_file.write("stop\n") @@ -160,3 +181,11 @@ class ManualRecord(Action): else: break time.sleep(3) + +# TODO +# 1. 截圖信息显示 KO +# 2. 不同功能测试 OK +# 3. demo 生成路径错误, 这个地方的结合需要考虑 + # 1. Documentation Generate + # 2. Role Test + diff --git a/examples/andriod_assistant/actions/self_learn_and_reflect.py b/examples/andriod_assistant/actions/self_learn_and_reflect.py index cf3ed91ae..701959d88 100644 --- a/examples/andriod_assistant/actions/self_learn_and_reflect.py +++ b/examples/andriod_assistant/actions/self_learn_and_reflect.py @@ -61,15 +61,12 @@ class SelfLearnAndReflect(Action): self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv ) -> AndroidActionOutput: resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env) - print(resp) resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env) - print(resp) return resp async def run_self_learn( self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv ) -> AndroidActionOutput: - logger.info('run_self_learn') screenshot_path: Path = env.observe( EnvAPIAbstract( api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} @@ -83,8 +80,6 @@ class SelfLearnAndReflect(Action): clickable_list = [] focusable_list = [] - # TODO Tuple Bug 从这里开始 Debug - # TODO Tuple Bug traverse_xml_tree(xml_path, clickable_list, "clickable", True) traverse_xml_tree(xml_path, focusable_list, "focusable", True) elem_list = [] diff --git a/examples/andriod_assistant/test_for_an.py b/examples/andriod_assistant/test_for_an.py index 9ab0d4bc0..f60e103b5 100644 --- a/examples/andriod_assistant/test_for_an.py +++ b/examples/andriod_assistant/test_for_an.py @@ -34,7 +34,17 @@ test_manual_parse = ParseRecord() if __name__ == "__main__": loop = asyncio.get_event_loop() - loop.run_until_complete( + + # loop.run_until_complete( + # test_manual_record.run( + # demo_name=DEMO_NAME, + # task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", + # task_dir=TASK_PATH, + # env=test_env_manual_learn_android + # ) + # ) + + test_action_list = [ test_self_learning.run( round_count=20, task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", @@ -42,8 +52,14 @@ if __name__ == "__main__": task_dir=TASK_PATH, docs_dir=DOC_PATH, env=test_env_self_learn_android + ), + test_manual_record.run( + demo_name=DEMO_NAME, + task_dir=TASK_PATH, + task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", + env=test_env_manual_learn_android ) - ) + ] # test_action_list = [ # test_self_learning.run( diff --git a/examples/andriod_assistant/utils/schema.py b/examples/andriod_assistant/utils/schema.py index 75396ac6a..18e637a0d 100644 --- a/examples/andriod_assistant/utils/schema.py +++ b/examples/andriod_assistant/utils/schema.py @@ -101,8 +101,8 @@ class TextOp(BaseOpParam): class LongPressOp(BaseOpParam): area: int = Field(default=-1) - -class SwipeOp(BaseOpParam): +# Modify This SwipeOp to SwipeOp_3, Need better name +class SwipeOp_3(BaseOpParam): area: int = Field(default=-1) swipe_orient: str = Field(default="up") dist: str = Field(default="") diff --git a/examples/andriod_assistant/utils/utils.py b/examples/andriod_assistant/utils/utils.py index bddb75f99..d696ac4f0 100644 --- a/examples/andriod_assistant/utils/utils.py +++ b/examples/andriod_assistant/utils/utils.py @@ -14,7 +14,7 @@ from metagpt.logs import logger from examples.andriod_assistant.utils.schema import AndroidElement from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \ - LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, RunState, ReflectOp, Decision + LongPressOp, LongPressGridOp, SwipeOp_3, SwipeGridOp, TextOp, RunState, ReflectOp, Decision def get_id_from_element(elem: Element) -> str: @@ -217,7 +217,7 @@ def screenshot_parse_extract_without_grid(act_name: str, act: str, last_act: str elif act_name == ActionOp.SWIPE.value: params = re.findall(r"swipe\((.*?)\)", act)[0].split(",") params = op_params_clean(params) # area, swipe_orient, dist - op = SwipeOp(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act) + op = SwipeOp_3(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act) elif act_name == ActionOp.GRID.value: op = GridOp(act_name=act_name) else: