diff --git a/examples/andriod_assistant/README.md b/examples/andriod_assistant/README.md new file mode 100644 index 000000000..60649a573 --- /dev/null +++ b/examples/andriod_assistant/README.md @@ -0,0 +1,71 @@ +# The Android Assisant +The Android assistant can learn from your daily operations or automatically learn, and perform App operations according to your instructions, thereby realizing any of your needs on the phone and freeing up your hands. + +## Install + +### Device Simulator +1. Firstly, install ADB on the PC, which enables your PC to interact with Android devices +2. Connect the Android device to the computer's USB port +3. If you do not have an Android device, you can download Android Studio and use its Android emulator to carry out the subsequent operations. The steps to install the Android emulator can be found here:[快速安装Android Studio & Simulator](https://dev.weixin.qq.com/docs/framework/dev/framework/env/android-simulator.html)) + +### Install Requirments +You can run the following command line: +```bash +pip install -r requirements.txt +``` +## Experiential Learning +By designating the app to explore and the method of learning (automatic or manual demonstration), you can facilitate Android Assistant to master the functions of various apps, thereby generating respective documentation for later use during the phase termed as "Automation of routine tasks". For any given task objective, conducting approximately 20 cycles of exploration can considerably enhance the performance. You can experiment with both the automatic learning and manual demonstration modes for the "contacts" app by implementing the ensuing commands: + +```bash +python run_assistant.py "your task description" --stage "learn" --mode "auto/manual" --app-name "Contacts" +``` +## Free Your Hands +Once the Android Assistant has completed ample exploration, you are all set to automate your tasks! By utilizing either text description or voice input, you can instruct the Android Assistant to perform the desired tasks across various applications. For the specific command processes, please see the following recommendations: +### By Text +```bash +python run_assistant.py "your task description" --stage "act" --mode "auto/manual" --app-name "app names" +``` +### By Voice +coming soon + +## Run It +You can run Android Assisant by running the following command line: +```bash +python run_assistant.py "your task description" --stage "your choice(learn/act)" --mode "your choice(auto/manual)" --app-name "app name" +``` +And the specific parameters are as follows: +```text +Usage: run_assistant.py [OPTIONS] TASK_DESC + + Run a Android Assistant + +Arguments: + TASK_DESC the task description you want the android assistant to learn or + act [required] + +Options: + --n-round INTEGER The max round to do an app operation task. + [default: 20] + --stage TEXT stage: learn / act [default: learn] + --mode TEXT mode: auto / manual , when state=learn + [default: auto] + --app-name TEXT the name of app you want to run [default: + demo] + --investment FLOAT Dollar amount to invest in the AI company. + [default: 5.0] + --refine-doc / --no-refine-doc Refine existing operation docs based on the + latest observation if True. [default: no- + refine-doc] + --min-dist INTEGER The minimum distance between elements to + prevent overlapping during the labeling + process. [default: 30] + --android-screenshot-dir TEXT The path to store screenshots on android + device. Make sure it exists. [default: + /sdcard/Pictures/Screenshots] + --android-xml-dir TEXT The path to store xml files for determining + UI elements localtion. Make sure it exists. + [default: /sdcard] + --device-id TEXT The Android device_id [default: + emulator-5554] + --help Show this message and exit. +``` diff --git a/examples/andriod_assistant/actions/__init__.py b/examples/andriod_assistant/actions/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/examples/andriod_assistant/actions/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/examples/andriod_assistant/actions/manual_record.py b/examples/andriod_assistant/actions/manual_record.py new file mode 100644 index 000000000..5deafa680 --- /dev/null +++ b/examples/andriod_assistant/actions/manual_record.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : manual record user interaction in stage=learn & mode=manual, LIKE scripts/step_recorder.py +import time +from pathlib import Path + +import cv2 + +from examples.andriod_assistant.utils.schema import ( + ActionOp, + AndroidActionOutput, + RunState, + SwipeOp, +) +from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.const import ADB_EXEC_FAIL +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.environment.api.env_api import EnvAPIAbstract +from metagpt.logs import logger + + +class ManualRecord(Action): + """do a human operation on the screen with human input""" + + name: str = "ManualRecord" + + useless_list: list[str] = [] # store useless elements uid + record_path: Path = "" + task_desc_path: Path = "" + screenshot_before_path: Path = "" + screenshot_after_path: Path = "" + xml_path: Path = "" + + # async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv): + async def run(self, task_desc: str, task_dir: Path, env: AndroidEnv): + self.record_path = Path(task_dir) / "record.txt" + self.task_desc_path = Path(task_dir) / "task_desc.txt" + self.screenshot_before_path = Path(task_dir) / "raw_screenshots" + self.screenshot_after_path = Path(task_dir) / "labeled_screenshots" + self.xml_path = Path(task_dir) / "xml" + + for path in [self.screenshot_before_path, self.screenshot_after_path, self.xml_path]: + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + + with open(self.record_path, "w") as file: + file.write("") + record_file = open(self.record_path, "w") + with open(self.task_desc_path, "w") as f: + f.write(task_desc) + step = 0 + while True: + step += 1 + screenshot_path: Path = await env.observe( + EnvAPIAbstract( + api_name="get_screenshot", + # kwargs={"ss_name": f"{demo_name}_{step}", "local_save_dir": self.screenshot_before_path} + kwargs={"ss_name": f"{step}", "local_save_dir": self.screenshot_before_path}, + ) + ) + xml_path: Path = await env.observe( + EnvAPIAbstract( + api_name="get_xml", + # kwargs={"xml_name": f"{demo_name}_{step}", "local_save_dir": self.xml_path} + kwargs={"xml_name": f"{step}", "local_save_dir": self.xml_path}, + ) + ) + if not screenshot_path.exists() or not xml_path.exists(): + return AndroidActionOutput(action_state=RunState.FAIL) + clickable_list = [] + focusable_list = [] + traverse_xml_tree(xml_path, clickable_list, "clickable", True) + traverse_xml_tree(xml_path, focusable_list, "focusable", True) + elem_list = [] + for elem in clickable_list: + if elem.uid in self.useless_list: + continue + elem_list.append(elem) + for elem in focusable_list: + if elem.uid in self.useless_list: + continue + bbox = elem.bbox + center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + close = False + for e in clickable_list: + bbox = e.bbox + center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 + if dist <= config.get_other("min_dist"): + close = True + break + if not close: + elem_list.append(elem) + screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{step}_labeled.png") + # screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{demo_name}_{step}_labeled.png") + labeled_img = draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list) + + cv2.imshow("image", labeled_img) + cv2.waitKey(0) + cv2.destroyAllWindows() + + user_input = "xxx" + logger.info( + "Choose one of the following actions you want to perform on the current screen:\ntap, text, long_press," + "swipe, stop", + "blue", + ) + + while ( + user_input.lower() != ActionOp.TAP.value + and user_input.lower() != ActionOp.TEXT.value + and user_input.lower() != ActionOp.LONG_PRESS.value + and user_input.lower() != ActionOp.SWIPE.value + and user_input.lower() != ActionOp.STOP.value + ): + user_input = input() + + if user_input.lower() == ActionOp.TAP.value: + logger.info( + f"Which element do you want to tap? Choose a numeric tag from 1 to {len(elem_list)}:", "blue" + ) + user_input = "xxx" + while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1: + user_input = input() + tl, br = elem_list[int(user_input) - 1].bbox + x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 + ret = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y})) + if ret == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + record_file.write(f"tap({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n") + elif user_input.lower() == ActionOp.TEXT.value: + logger.info( + f"Which element do you want to input the text string? Choose a numeric tag from 1 to " + f"{len(elem_list)}:", + "blue", + ) + input_area = "xxx" + while not input_area.isnumeric() or int(input_area) > len(elem_list) or int(input_area) < 1: + input_area = input() + logger.info("Enter your input text below:", "blue") + user_input = "" + while not user_input: + user_input = input() + await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": user_input})) + record_file.write(f'text({input_area}:sep:"{user_input}"):::{elem_list[int(input_area) - 1].uid}\n') + elif user_input.lower() == ActionOp.LONG_PRESS.value: + logger.info( + f"Which element do you want to long press? Choose a numeric tag from 1 to {len(elem_list)}:", "blue" + ) + user_input = "xxx" + while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1: + user_input = input() + tl, br = elem_list[int(user_input) - 1].bbox + x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 + ret = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) + if ret == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + record_file.write(f"long_press({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n") + elif user_input.lower() == ActionOp.SWIPE.value: + logger.info( + "What is the direction of your swipe? Choose one from the following options:\nup, down, left," + " right", + "blue", + ) + user_input = "" + while ( + user_input != SwipeOp.UP.value + and user_input != SwipeOp.DOWN.value + and user_input != SwipeOp.LEFT.value + and user_input != SwipeOp.RIGHT.value + ): + user_input = input() + swipe_dir = user_input + logger.info(f"Which element do you want to swipe? Choose a numeric tag from 1 to {len(elem_list)}:") + while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1: + user_input = input() + tl, br = elem_list[int(user_input) - 1].bbox + x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 + ret = await env.step( + EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir}) + ) + if ret == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + record_file.write(f"swipe({int(user_input)}:sep:{swipe_dir}):::{elem_list[int(user_input) - 1].uid}\n") + elif user_input.lower() == ActionOp.STOP.value: + record_file.write("stop\n") + record_file.close() + break + else: + break + time.sleep(3) diff --git a/examples/andriod_assistant/actions/parse_record.py b/examples/andriod_assistant/actions/parse_record.py new file mode 100644 index 000000000..51759d9cd --- /dev/null +++ b/examples/andriod_assistant/actions/parse_record.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : parse record to generate learned standard operations in stage=learn & mode=manual, +# LIKE scripts/document_generation.py + +import ast +import json +import re +from pathlib import Path + +from examples.andriod_assistant.actions.parse_record_an import RECORD_PARSE_NODE +from examples.andriod_assistant.prompts.operation_prompt import ( + long_press_doc_template, + refine_doc_suffix, + swipe_doc_template, + tap_doc_template, + text_doc_template, +) +from examples.andriod_assistant.utils.schema import ( + ActionOp, + AndroidActionOutput, + RecordLogItem, + RunState, + SwipeOp, +) +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.logs import logger +from metagpt.utils.common import encode_image + + +class ParseRecord(Action): + name: str = "ParseRecord" + record_path: Path = "" + task_desc_path: Path = "" + screenshot_before_path: Path = "" + screenshot_after_path: Path = "" + + # async def run(self, app_name: str, demo_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv): + async def run(self, app_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv): + docs_dir.mkdir(parents=True, exist_ok=True) + doc_count = 0 + self.record_path = Path(task_dir) / "record.txt" + self.task_desc_path = Path(task_dir) / "task_desc.txt" + self.screenshot_before_path = Path(task_dir) / "raw_screenshots" + self.screenshot_after_path = Path(task_dir) / "labeled_screenshots" + + with open(self.record_path, "r") as record_file: + record_step_count = len(record_file.readlines()) - 1 + record_file.seek(0) + for step in range(1, record_step_count + 1): + # img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step}_labeled.png")) + # img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step + 1}_labeled.png")) + img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step}_labeled.png")) + img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step + 1}_labeled.png")) + rec = record_file.readline().strip() + action, resource_id = rec.split(":::") + action_type = action.split("(")[0] + # 构建Prompt + action_param = re.findall(r"\((.*?)\)", action)[0] + if action_type == ActionOp.TAP.value: + prompt_template = tap_doc_template + context = prompt_template.format(ui_element=action_param) + elif action_type == ActionOp.TEXT.value: + input_area, input_text = action_param.split(":sep:") + prompt_template = text_doc_template + context = prompt_template.format(ui_element=input_area) + elif action_type == ActionOp.LONG_PRESS.value: + prompt_template = long_press_doc_template + context = prompt_template.format(ui_element=action_param) + elif action_type == ActionOp.SWIPE.value: + swipe_area, swipe_dir = action_param.split(":sep:") + if swipe_dir == SwipeOp.UP.value or swipe_dir == SwipeOp.DOWN.value: + action_type = ActionOp.VERTICAL_SWIPE.value + elif swipe_dir == SwipeOp.LEFT.value or swipe_dir == SwipeOp.RIGHT.value: + action_type = ActionOp.HORIZONTAL_SWIPE.value + prompt_template = swipe_doc_template + context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area) + else: + break + task_desc_path = task_dir.joinpath("task_desc.txt") + task_desc = open(task_desc_path, "r").read() + context = context.format(task_desc=task_desc) + + doc_name = resource_id + ".txt" + doc_path = docs_dir.joinpath(doc_name) + + if doc_path.exists(): + doc_content = ast.literal_eval(open(doc_path).read()) + if doc_content[action_type]: + if config.get_other("doc_refine"): + refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type]) + context += refine_context + logger.info( + f"Documentation for the element {resource_id} already exists. The doc will be " + f"refined based on the latest demo." + ) + else: + logger.info( + f"Documentation for the element {resource_id} already exists. Turn on DOC_REFINE " + f"in the config file if needed." + ) + continue + else: + doc_content = {"tap": "", "text": "", "v_swipe": "", "h_swipe": "", "long_press": ""} + + logger.info(f"Waiting for GPT-4V to generate documentation for the element {resource_id}") + node = await RECORD_PARSE_NODE.fill( + context=context, llm=self.llm, images=[img_before_base64, img_after_base64] + ) + if "error" in node.content: + return AndroidActionOutput(action_state=RunState.FAIL) + # log_path = task_dir.joinpath(f"log_{app_name}_{demo_name}.txt") + log_path = task_dir.joinpath(f"log_{app_name}.txt") + prompt = node.compile(context=context, schema="json", mode="auto") + msg = node.content + doc_content[action_type] = msg + + with open(log_path, "a") as logfile: + log_item = RecordLogItem( + step=step, + prompt=prompt, + image_before=img_before_base64, + image_after=img_after_base64, + response=node.content, + ) + logfile.write(json.dumps(log_item.model_dump()) + "\n") + with open(doc_path, "w") as outfile: + outfile.write(str(doc_content)) + doc_count += 1 + logger.info(f"Documentation generated and saved to {doc_path}") + + # TODO MetaGPT 里面的Config 需要看一下 + # time.sleep(config.get_other("request_interval")) + + logger.info(f"Documentation generation phase completed. {doc_count} docs generated.") + + +# TODO +# 1. LOG中记录方式有问题,需要把IMG的部分拿出去丢掉 diff --git a/examples/andriod_assistant/actions/parse_record_an.py b/examples/andriod_assistant/actions/parse_record_an.py new file mode 100644 index 000000000..210c93e23 --- /dev/null +++ b/examples/andriod_assistant/actions/parse_record_an.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the ActionNode to parse record + +from metagpt.actions.action_node import ActionNode + +OBSERVATION = ActionNode( + key="Observation", + expected_type=str, + instruction="Provide a description of your observations of the two images. " + "Subsequently, delineate the distinctions between the first image and the second one.", + example="", +) + +THOUGHT = ActionNode( + key="Thought", + expected_type=str, + instruction="Consider the impact of Action acting on UI elements.", + example="", +) + +DESCRIPTION = ActionNode( + key="Description", + expected_type=str, + instruction="Describe the functionality of the UI element concisely in one or two sentences Do not include " + "the numeric tag in your description", + example="", +) + +NODES = [OBSERVATION, THOUGHT, DESCRIPTION] + +RECORD_PARSE_NODE = ActionNode.from_children("RecordParse", NODES) diff --git a/examples/andriod_assistant/actions/screenshot_parse.py b/examples/andriod_assistant/actions/screenshot_parse.py new file mode 100644 index 000000000..f3dd7da6c --- /dev/null +++ b/examples/andriod_assistant/actions/screenshot_parse.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : LIKE scripts/task_executor.py in stage=act + +import ast +from pathlib import Path + +from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE +from examples.andriod_assistant.prompts.assistant_prompt import ( + screenshot_parse_template, + screenshot_parse_with_grid_template, +) +from examples.andriod_assistant.utils.schema import ( + AndroidActionOutput, + AndroidElement, + GridOp, + LongPressGridOp, + LongPressOp, + OpLogItem, + RunState, + SwipeGridOp, + SwipeOp_3, + TapGridOp, + TapOp, + TextOp, +) +from examples.andriod_assistant.utils.utils import ( + area_to_xy, + draw_bbox_multi, + draw_grid, + elem_bbox_to_xy, + screenshot_parse_extract, + traverse_xml_tree, +) +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.const import ADB_EXEC_FAIL +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.environment.api.env_api import EnvAPIAbstract +from metagpt.utils.common import encode_image + + +class ScreenshotParse(Action): + name: str = "ScreenshotParse" + + def _makeup_ui_document(self, elem_list: list[AndroidElement], docs_idr: Path, use_exist_doc: bool = True) -> str: + if not use_exist_doc: + return "" + + ui_doc = """ + You also have access to the following documentations that describes the functionalities of UI + elements you can interact on the screen. These docs are crucial for you to determine the target of your + next action. You should always prioritize these documented elements for interaction:""" + for i, elem in enumerate(elem_list): + doc_path = docs_idr.joinpath(f"{elem.uid}.txt") + if not doc_path.exists(): + continue + ui_doc += f"Documentation of UI element labeled with the numeric tag '{i + 1}':\n" + doc_content = ast.literal_eval(open(doc_path, "r").read()) + if doc_content["tap"]: + ui_doc += f"This UI element is clickable. {doc_content['tap']}\n\n" + if doc_content["text"]: + ui_doc += ( + f"This UI element can receive text input. The text input is used for the following " + f"purposes: {doc_content['text']}\n\n" + ) + if doc_content["long_press"]: + ui_doc += f"This UI element is long clickable. {doc_content['long_press']}\n\n" + if doc_content["v_swipe"]: + ui_doc += ( + f"This element can be swiped directly without tapping. You can swipe vertically on " + f"this UI element. {doc_content['v_swipe']}\n\n" + ) + if doc_content["h_swipe"]: + ui_doc += ( + f"This element can be swiped directly without tapping. You can swipe horizontally on " + f"this UI element. {doc_content['h_swipe']}\n\n" + ) + return ui_doc + + async def run( + self, + round_count: int, + task_desc: str, + last_act: str, + task_dir: Path, + docs_dir: Path, + grid_on: bool, + env: AndroidEnv, + ): + for path in [task_dir, docs_dir]: + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + + screenshot_path: Path = await env.observe( + EnvAPIAbstract( + api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} + ) + ) + xml_path: Path = await env.observe( + EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir}) + ) + width, height = env.device_shape + if not screenshot_path.exists() or not xml_path.exists(): + return AndroidActionOutput(action_state=RunState.FAIL) + + clickable_list = [] + focusable_list = [] + traverse_xml_tree(xml_path, clickable_list, "clickable", True) + traverse_xml_tree(xml_path, focusable_list, "focusable", True) + elem_list: list[AndroidElement] = clickable_list.copy() + for elem in focusable_list: + bbox = elem.bbox + center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + close = False + for e in clickable_list: + bbox = e.bbox + center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 + if dist <= config.get_other("min_dist"): + close = True + break + if not close: + elem_list.append(elem) + + screenshot_labeled_path = task_dir.joinpath(f"{round_count}_labeled.png") + draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list) + img_base64 = encode_image(screenshot_labeled_path) + + parse_template = screenshot_parse_with_grid_template if grid_on else screenshot_parse_template + + if grid_on: + env.rows, env.cols = draw_grid(screenshot_path, task_dir / f"{round_count}_grid.png") + + ui_doc = self._makeup_ui_document(elem_list, docs_dir) + context = parse_template.format(ui_document=ui_doc, task_description=task_desc, last_act=last_act) + node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64]) + + if "error" in node.content: + return AndroidActionOutput(action_state=RunState.FAIL) + + prompt = node.compile(context=context, schema="json", mode="auto") + OpLogItem(step=round_count, prompt=prompt, image=str(screenshot_labeled_path), response=node.content) + + op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on) + if op_param.param_state == RunState.FINISH: + return AndroidActionOutput(action_state=RunState.FINISH) + if op_param.param_state == RunState.FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + + if isinstance(op_param, TapOp): + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, TextOp): + res = await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": op_param.input_str})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, LongPressOp): + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, SwipeOp_3): + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step( + EnvAPIAbstract( + api_name="user_swipe", + kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}, + ) + ) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, GridOp): + grid_on = True + elif isinstance(op_param, TapGridOp) or isinstance(op_param, LongPressGridOp): + x, y = area_to_xy(op_param.area, op_param.subarea, env.width, env.height, env.rows, env.cols) + if isinstance(op_param, TapGridOp): + res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + else: + # LongPressGridOp + res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, SwipeGridOp): + start_x, start_y = area_to_xy( + op_param.start_area, op_param.start_subarea, env.width, env.height, env.rows, env.cols + ) + end_x, end_y = area_to_xy( + op_param.end_area, op_param.end_subarea, env.width, env.height, env.rows, env.cols + ) + res = await env.step( + EnvAPIAbstract(api_name="user_swipe_to", kwargs={"start": (start_x, start_y), "end": (end_x, end_y)}) + ) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + + if op_param.act_name != "grid": + grid_on = True + + return AndroidActionOutput(data={"grid_on": grid_on}) diff --git a/examples/andriod_assistant/actions/screenshot_parse_an.py b/examples/andriod_assistant/actions/screenshot_parse_an.py new file mode 100644 index 000000000..eb23ba934 --- /dev/null +++ b/examples/andriod_assistant/actions/screenshot_parse_an.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the ActionNode to parse screenshot + +from metagpt.actions.action_node import ActionNode + +OBSERVATION = ActionNode( + key="Observation", expected_type=str, instruction="Describe what you observe in the image", example="" +) + +THOUGHT = ActionNode( + key="Thought", + expected_type=str, + instruction="To complete the given task, what is the next step I should do", + example="", +) + +ACTION = ActionNode( + key="Action", + expected_type=str, + instruction="The function call with the correct parameters to proceed with the task. If you believe the task is " + "completed or there is nothing to be done, you should output FINISH. You cannot output anything else " + "except a function call or FINISH in this field.", + example="", +) + +SUMMARY = ActionNode( + key="Summary", + expected_type=str, + instruction="Summarize your past actions along with your latest action in one or two sentences. Do not include " + "the numeric tag in your summary", + example="", +) + +SUMMARY_GRID = ActionNode( + key="Summary", + expected_type=str, + instruction="Summarize your past actions along with your latest action in one or two sentences. Do not include " + "the grid area number in your summary", + example="", +) + +NODES = [OBSERVATION, THOUGHT, ACTION, SUMMARY] + +NODES_GRID = [OBSERVATION, THOUGHT, ACTION, SUMMARY_GRID] + +SCREENSHOT_PARSE_NODE = ActionNode.from_children("ScreenshotParse", NODES) +SCREENSHOT_PARSE_GRID_NODE = ActionNode.from_children("ScreenshotParseGrid", NODES_GRID) diff --git a/examples/andriod_assistant/actions/self_learn_and_reflect.py b/examples/andriod_assistant/actions/self_learn_and_reflect.py new file mode 100644 index 000000000..780985947 --- /dev/null +++ b/examples/andriod_assistant/actions/self_learn_and_reflect.py @@ -0,0 +1,246 @@ +# !/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage + +import ast +from pathlib import Path + +from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE +from examples.andriod_assistant.actions.self_learn_reflect_an import ( + SELF_LEARN_REFLECT_NODE, +) +from examples.andriod_assistant.prompts.assistant_prompt import ( + screenshot_parse_self_explore_reflect_template as reflect_template, +) +from examples.andriod_assistant.prompts.assistant_prompt import ( + screenshot_parse_self_explore_template, +) +from examples.andriod_assistant.utils.schema import ( + ActionOp, + AndroidActionOutput, + AndroidElement, + Decision, + DocContent, + LongPressOp, + OpLogItem, + ReflectLogItem, + RunState, + SwipeOp, + SwipeOp_3, + TapOp, + TextOp, +) +from examples.andriod_assistant.utils.utils import ( + draw_bbox_multi, + elem_bbox_to_xy, + reflect_parse_extarct, + screenshot_parse_extract, + traverse_xml_tree, +) +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.const import ADB_EXEC_FAIL +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.environment.api.env_api import EnvAPIAbstract +from metagpt.logs import logger +from metagpt.utils.common import encode_image + + +class SelfLearnAndReflect(Action): + name: str = "SelfLearnAndReflect" + + useless_list: list[str] = [] # store useless elements uid + + screenshot_before_path: str = "" + screenshot_before_base64: str = "" + elem_list: list[AndroidElement] = [] + swipe_orient: str = "up" + act_name: str = "" + ui_area: int = -1 + + async def run( + self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv + ) -> AndroidActionOutput: + for path in [task_dir, docs_dir]: + if not path.exists(): + path.mkdir(parents=True, exist_ok=True) + resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env) + resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env) + return resp + + async def run_self_learn( + self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv + ) -> AndroidActionOutput: + screenshot_path: Path = await env.observe( + EnvAPIAbstract( + api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir} + ) + ) + xml_path: Path = await env.observe( + EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir}) + ) + if not screenshot_path.exists() or not xml_path.exists(): + return AndroidActionOutput(action_state=RunState.FAIL) + + clickable_list = [] + focusable_list = [] + traverse_xml_tree(xml_path, clickable_list, "clickable", True) + traverse_xml_tree(xml_path, focusable_list, "focusable", True) + elem_list = [] + for elem in clickable_list: + if elem.uid in self.useless_list: + continue + elem_list.append(elem) + for elem in focusable_list: + if elem.uid in self.useless_list: + continue + bbox = elem.bbox + center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + close = False + for e in clickable_list: + bbox = e.bbox + center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 + if dist <= config.get_other("min_dist"): + close = True + break + if not close: + elem_list.append(elem) + screenshot_before_labeled_path = task_dir.joinpath(f"{round_count}_before_labeled.png") + draw_bbox_multi(screenshot_path, screenshot_before_labeled_path, elem_list) + img_base64 = encode_image(screenshot_before_labeled_path) + self.screenshot_before_base64 = img_base64 + self.screenshot_before_path = screenshot_before_labeled_path + + self_explore_template = screenshot_parse_self_explore_template + context = self_explore_template.format(task_description=task_desc, last_act=last_act) + + node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64]) + print(f"fill result:{node}") + if "error" in node.content: + return AndroidActionOutput(action_state=RunState.FAIL) + prompt = node.compile(context=context, schema="json", mode="auto") + # Modify WindowsPath to Str + OpLogItem(step=round_count, prompt=prompt, image=str(screenshot_before_labeled_path), response=node.content) + op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on=False) + # TODO Modify Op_param. When op_param.action is FINISH, how to solve this ? + if op_param.param_state == RunState.FINISH: + return AndroidActionOutput(action_state=RunState.FINISH) + if op_param.param_state == RunState.FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + + if isinstance(op_param, TapOp): + self.ui_area = op_param.area + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, TextOp): + res = await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": op_param.input_str})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, LongPressOp): + self.ui_area = op_param.area + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y})) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + elif isinstance(op_param, SwipeOp_3): + self.ui_area = op_param.area + self.swipe_orient = op_param.swipe_orient + x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox) + res = await env.step( + EnvAPIAbstract( + api_name="user_swipe", + kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}, + ) + ) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + + self.elem_list = elem_list + self.act_name = op_param.act_name + return AndroidActionOutput() + + async def run_reflect( + self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv + ) -> AndroidActionOutput: + screenshot_path: Path = await env.observe( + EnvAPIAbstract( + api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_after", "local_save_dir": task_dir} + ) + ) + if not screenshot_path.exists(): + return AndroidActionOutput(action_state=RunState.FAIL) + + screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png") + draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list) + img_base64 = encode_image(screenshot_after_labeled_path) + if self.act_name == ActionOp.TAP.value: + action = "tapping" + elif self.act_name == ActionOp.LONG_PRESS.value: + action = "long pressing" + elif self.act_name == ActionOp.SWIPE.value: + action = "swiping" + if self.swipe_orient == SwipeOp.UP.value or self.swipe_orient == SwipeOp.DOWN.value: + action = "v_swipe" + elif self.swipe_orient == SwipeOp.LEFT.value or self.swipe_orient == SwipeOp.RIGHT.value: + action = "h_swipe" + else: + # TODO Test for assignment, This error is eupiped with the next. + logger.info(f"Warning: current action name:{self.act_name}") + logger.info("Warning: act_name parse wrong!") + action = None + context = reflect_template.format( + action=action, ui_element=str(self.ui_area), task_desc=task_desc, last_act=last_act + ) + node = await SELF_LEARN_REFLECT_NODE.fill( + context=context, llm=self.llm, images=[self.screenshot_before_base64, img_base64] + ) + + if "error" in node.content: + return AndroidActionOutput(action_state=RunState.FAIL) + + prompt = node.compile(context=context, schema="json", mode="auto") + ReflectLogItem( + step=round_count, + prompt=prompt, + image_before=str(self.screenshot_before_path), + image_after=str(screenshot_after_labeled_path), + response=node.content, + ) + + op_param = reflect_parse_extarct(node.instruct_content.model_dump()) + if op_param.param_state == RunState.FINISH: + return AndroidActionOutput(action_state=RunState.FINISH) + if op_param.param_state == RunState.FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + # TODO 这里经常出现错误 + logger.info(f"Error 高发地区, 长度为{len(self.elem_list)},ui_erea为{self.ui_area}") + resource_id = self.elem_list[int(self.ui_area) - 1].uid + if op_param.decision == Decision.INEFFECTIVE.value: + self.useless_list.append(resource_id) + last_act = "NONE" # TODO global + elif op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value, Decision.SUCCESS.value]: + if op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value]: + self.useless_list.append(resource_id) + last_act = "NONE" + if op_param.decision == Decision.BACK.value: + res = await env.step(EnvAPIAbstract(api_name="system_back")) + if res == ADB_EXEC_FAIL: + return AndroidActionOutput(action_state=RunState.FAIL) + doc = op_param.documentation + doc_path = docs_dir.joinpath(f"{resource_id}.txt") + if doc_path.exists(): + doc_content = ast.literal_eval(open(doc_path).read()) + if doc_content[self.act_name]: + logger.info(f"Documentation for the element {resource_id} already exists.") + return AndroidActionOutput(action_state=RunState.FAIL) + else: + doc_content = DocContent() + setattr(doc_content, self.act_name, doc) + doc_path.write_text(str(doc_content)) + return AndroidActionOutput(data={"last_act": last_act}) + + +# TODO 如何处理 FINISH 状态,这一点应该需要与role 联动才能解决 diff --git a/examples/andriod_assistant/actions/self_learn_reflect_an.py b/examples/andriod_assistant/actions/self_learn_reflect_an.py new file mode 100644 index 000000000..305b7376a --- /dev/null +++ b/examples/andriod_assistant/actions/self_learn_reflect_an.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the ActionNode to parse Reflection + +from metagpt.actions.action_node import ActionNode + +DECISION = ActionNode( + key="Decision", expected_type=str, instruction="explain why you made this decision", example="BACK" +) + + +THOUGHT = ActionNode(key="Thought", expected_type=str, instruction="explain why you made this decision", example="") + + +DOCUMENTATION = ActionNode( + key="Documentation", expected_type=str, instruction="describe the function of the UI element", example="" +) + + +NODES = [DECISION, THOUGHT, DOCUMENTATION] +SELF_LEARN_REFLECT_NODE = ActionNode.from_children("SelfLearnReflect", NODES) diff --git a/examples/andriod_assistant/prompts/__init__.py b/examples/andriod_assistant/prompts/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/examples/andriod_assistant/prompts/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/examples/andriod_assistant/prompts/assistant_prompt.py b/examples/andriod_assistant/prompts/assistant_prompt.py new file mode 100644 index 000000000..c0dc6f22f --- /dev/null +++ b/examples/andriod_assistant/prompts/assistant_prompt.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the prompt templates of assistant learning and acting + +screenshot_parse_template = """You are an agent that is trained to perform some basic tasks on a smartphone. You will be given a +smartphone screenshot. The interactive UI elements on the screenshot are labeled with numeric tags starting from 1. The +numeric tag of each interactive element is located in the center of the element. + +You can call the following functions to control the smartphone: + +1. tap(element: int) +This function is used to tap an UI element shown on the smartphone screen. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. +A simple use case can be tap(5), which taps the UI element labeled with the number 5. + +2. text(text_input: str) +This function is used to insert text input in an input field/box. text_input is the string you want to insert and must +be wrapped with double quotation marks. A simple use case can be text("Hello, world!"), which inserts the string +"Hello, world!" into the input area on the smartphone screen. This function is usually callable when you see a keyboard +showing in the lower half of the screen. + +3. long_press(element: int) +This function is used to long press an UI element shown on the smartphone screen. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. +A simple use case can be long_press(5), which long presses the UI element labeled with the number 5. + +4. swipe(element: int, direction: str, dist: str) +This function is used to swipe an UI element shown on the smartphone screen, usually a scroll view or a slide bar. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. "direction" is a string that +represents one of the four directions: up, down, left, right. "direction" must be wrapped with double quotation +marks. "dist" determines the distance of the swipe and can be one of the three options: short, medium, long. You should +choose the appropriate distance option according to your need. +A simple use case can be swipe(21, "up", "medium"), which swipes up the UI element labeled with the number 21 for a +medium distance. + +5. grid() +You should call this function when you find the element you want to interact with is not labeled with a numeric tag and +other elements with numeric tags cannot help with the task. The function will bring up a grid overlay to divide the +smartphone screen into small areas and this will give you more freedom to choose any part of the screen to tap, long +press, or swipe. +{ui_document} +The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as +follows: {last_act} +Now, given the documentation and the following labeled screenshot, you need to think and call the function needed to +proceed with the task. Your output should include three parts in the given format: + +You can only take one action at a time, so please directly call the function.""" + +screenshot_parse_with_grid_template = """You are an agent that is trained to perform some basic tasks on a smartphone. You will be given +a smartphone screenshot overlaid by a grid. The grid divides the screenshot into small square areas. Each area is +labeled with an integer in the top-left corner. + +You can call the following functions to control the smartphone: + +1. tap(area: int, subarea: str) +This function is used to tap a grid area shown on the smartphone screen. "area" is the integer label assigned to a grid +area shown on the smartphone screen. "subarea" is a string representing the exact location to tap within the grid area. +It can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left, bottom, and +bottom-right. +A simple use case can be tap(5, "center"), which taps the exact center of the grid area labeled with the number 5. + +2. long_press(area: int, subarea: str) +This function is used to long press a grid area shown on the smartphone screen. "area" is the integer label assigned to +a grid area shown on the smartphone screen. "subarea" is a string representing the exact location to long press within +the grid area. It can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left, bottom, +and bottom-right. +A simple use case can be long_press(7, "top-left"), which long presses the top left part of the grid area labeled with +the number 7. + +3. swipe(start_area: int, start_subarea: str, end_area: int, end_subarea: str) +This function is used to perform a swipe action on the smartphone screen, especially when you want to interact with a +scroll view or a slide bar. "start_area" is the integer label assigned to the grid area which marks the starting +location of the swipe. "start_subarea" is a string representing the exact location to begin the swipe within the grid +area. "end_area" is the integer label assigned to the grid area which marks the ending location of the swipe. +"end_subarea" is a string representing the exact location to end the swipe within the grid area. +The two subarea parameters can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left, +bottom, and bottom-right. +A simple use case can be swipe(21, "center", 25, "right"), which performs a swipe starting from the center of grid area +21 to the right part of grid area 25. + +The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as +follows: {last_act} +Now, given the following labeled screenshot, you need to think and call the function needed to proceed with the task. +Your output should include three parts in the given format: + +You can only take one action at a time, so please directly call the function.""" + +screenshot_parse_self_explore_template = """You are an agent that is trained to complete certain tasks on a smartphone. You will be +given a screenshot of a smartphone app. The interactive UI elements on the screenshot are labeled with numeric tags +starting from 1. + +You can call the following functions to interact with those labeled elements to control the smartphone: + +1. tap(element: int) +This function is used to tap an UI element shown on the smartphone screen. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. +A simple use case can be tap(5), which taps the UI element labeled with the number 5. + +2. text(text_input: str) +This function is used to insert text input in an input field/box. text_input is the string you want to insert and must +be wrapped with double quotation marks. A simple use case can be text("Hello, world!"), which inserts the string +"Hello, world!" into the input area on the smartphone screen. This function is only callable when you see a keyboard +showing in the lower half of the screen. + +3. long_press(element: int) +This function is used to long press an UI element shown on the smartphone screen. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. +A simple use case can be long_press(5), which long presses the UI element labeled with the number 5. + +4. swipe(element: int, direction: str, dist: str) +This function is used to swipe an UI element shown on the smartphone screen, usually a scroll view or a slide bar. +"element" is a numeric tag assigned to an UI element shown on the smartphone screen. "direction" is a string that +represents one of the four directions: up, down, left, right. "direction" must be wrapped with double quotation +marks. "dist" determines the distance of the swipe and can be one of the three options: short, medium, long. You should +choose the appropriate distance option according to your need. +A simple use case can be swipe(21, "up", "medium"), which swipes up the UI element labeled with the number 21 for a +medium distance. + +The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as +follows: {last_act} +Now, given the following labeled screenshot, you need to think and call the function needed to proceed with the task. +Your output should include three parts in the given format: + +You can only take one action at a time, so please directly call the function.""" + +screenshot_parse_self_explore_reflect_template = """I will give you screenshots of a mobile app before and after {action} the UI +element labeled with the number '{ui_element}' on the first screenshot. The numeric tag of each element is located at +the center of the element. The action of {action} this UI element was described as follows: +{last_act} +The action was also an attempt to proceed with a larger task, which is to {task_desc}. Your job is to carefully analyze +the difference between the two screenshots to determine if the action is in accord with the description above and at +the same time effectively moved the task forward. Your output should be determined based on the following situations: +1. BACK +If you think the action navigated you to a page where you cannot proceed with the given task, you should go back to the +previous interface. At the same time, describe the functionality of the UI element concisely in one or two sentences by +observing the difference between the two screenshots. Notice that your description of the UI element should focus on +the general function. Never include the numeric tag of the UI element in your description. You can use pronouns such as +"the UI element" to refer to the element. Your output should be in the following format: +Decision: BACK +Thought: +Documentation: +2. INEFFECTIVE +If you find the action changed nothing on the screen (screenshots before and after the action are identical), you +should continue to interact with other elements on the screen. Notice that if you find the location of the cursor +changed between the two screenshots, then they are not identical. Your output should be in the following format: +Decision: INEFFECTIVE +Thought: +Documentation: +3. CONTINUE +If you find the action changed something on the screen but does not reflect the action description above and did not +move the given task forward, you should continue to interact with other elements on the screen. At the same time, +describe the functionality of the UI element concisely in one or two sentences by observing the difference between the +two screenshots. Notice that your description of the UI element should focus on the general function. Never include the +numeric tag of the UI element in your description. You can use pronouns such as "the UI element" to refer to the +element. Your output should be in the following format: +Decision: CONTINUE +Thought: +Documentation: +4. SUCCESS +If you think the action successfully moved the task forward (even though it did not completed the task), you should +describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the UI +element should focus on the general function. Never include the numeric tag of the UI element in your description. You +can use pronouns such as "the UI element" to refer to the element. Your output should be in the following format: +Decision: SUCCESS +Thought: +Documentation: +""" diff --git a/examples/andriod_assistant/prompts/operation_prompt.py b/examples/andriod_assistant/prompts/operation_prompt.py new file mode 100644 index 000000000..1bde53f04 --- /dev/null +++ b/examples/andriod_assistant/prompts/operation_prompt.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the prompt templates of phone operation + +tap_doc_template = """I will give you the screenshot of a mobile app before and after tapping the UI element labeled +with the number {ui_element} on the screen. The numeric tag of each element is located at the center of the element. +Tapping this UI element is a necessary part of proceeding with a larger task, which is to . Your task is to +describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the UI +element should focus on the general function. For example, if the UI element is used to navigate to the chat window +with John, your description should not include the name of the specific person. Just say: "Tapping this area will +navigate the user to the chat window". Never include the numeric tag of the UI element in your description. You can use +pronouns such as "the UI element" to refer to the element.""" + +text_doc_template = """I will give you the screenshot of a mobile app before and after typing in the input area labeled +with the number {ui_element} on the screen. The numeric tag of each element is located at the center of the element. +Typing in this UI element is a necessary part of proceeding with a larger task, which is to . Your task is +to describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the +UI element should focus on the general function. For example, if the change of the screenshot shows that the user typed +"How are you?" in the chat box, you do not need to mention the actual text. Just say: "This input area is used for the +user to type a message to send to the chat window.". Never include the numeric tag of the UI element in your +description. You can use pronouns such as "the UI element" to refer to the element.""" + +long_press_doc_template = """I will give you the screenshot of a mobile app before and after long pressing the UI +element labeled with the number {ui_element} on the screen. The numeric tag of each element is located at the center of +the element. Long pressing this UI element is a necessary part of proceeding with a larger task, which is to +. Your task is to describe the functionality of the UI element concisely in one or two sentences. Notice +that your description of the UI element should focus on the general function. For example, if long pressing the UI +element redirects the user to the chat window with John, your description should not include the name of the specific +person. Just say: "Long pressing this area will redirect the user to the chat window". Never include the numeric tag of +the UI element in your description. You can use pronouns such as "the UI element" to refer to the element.""" + +swipe_doc_template = """I will give you the screenshot of a mobile app before and after swiping the UI +element labeled with the number {ui_element} on the screen. The numeric tag of each element is located at the center of +the element. Swiping this UI element is a necessary part of proceeding with a larger task, which is to . +Your task is to describe the functionality of the UI element concisely in one or two sentences. Notice that your +description of the UI element should be as general as possible. For example, if swiping the UI element increases the +contrast ratio of an image of a building, your description should be just like this: "Swiping this area enables the +user to tune a specific parameter of the image". Never include the numeric tag of the UI element in your description. +You can use pronouns such as "the UI element" to refer to the element.""" + +refine_doc_suffix = """\nA documentation of this UI element generated from previous demos is shown below. Your +generated description should be based on this previous doc and optimize it. Notice that it is possible that your +understanding of the function of the UI element derived from the given screenshots conflicts with the previous doc, +because the function of a UI element can be flexible. In this case, your generated description should combine both. +Old documentation of this UI element: {old_doc}""" diff --git a/examples/andriod_assistant/requirements.txt b/examples/andriod_assistant/requirements.txt new file mode 100644 index 000000000..e879bece5 --- /dev/null +++ b/examples/andriod_assistant/requirements.txt @@ -0,0 +1 @@ +pyshine==0.0.9 \ No newline at end of file diff --git a/examples/andriod_assistant/roles/__init__.py b/examples/andriod_assistant/roles/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/examples/andriod_assistant/roles/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/examples/andriod_assistant/roles/android_assistant.py b/examples/andriod_assistant/roles/android_assistant.py new file mode 100644 index 000000000..cf97b5fcd --- /dev/null +++ b/examples/andriod_assistant/roles/android_assistant.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : android assistant to learn from app operations and operate apps +import time +from datetime import datetime +from pathlib import Path +from typing import Optional + +from pydantic import Field + +from examples.andriod_assistant.actions.manual_record import ManualRecord +from examples.andriod_assistant.actions.parse_record import ParseRecord +from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse +from examples.andriod_assistant.actions.self_learn_and_reflect import ( + SelfLearnAndReflect, +) +from examples.andriod_assistant.utils.schema import AndroidActionOutput, RunState +from metagpt.actions.add_requirement import UserRequirement +from metagpt.config2 import config +from metagpt.logs import logger +from metagpt.roles.role import Role, RoleReactMode +from metagpt.schema import Message + + +class AndroidAssistant(Role): + name: str = "Nick" + profile: str = "AndroidAssistant" + goal: str = "operate the mobile phone's apps with self-learn" + + task_desc: str = "" + round_count: int = 0 + last_act: str = "" + task_dir: Optional[Path] = Field(default=None) + docs_dir: Optional[Path] = Field(default=None) + grid_on: bool = Field(default=False) + + def __init__(self, **data): + super().__init__(**data) + + self._watch([UserRequirement, AndroidActionOutput]) + self.task_desc = config.get_other("task_desc", "Just explore any app in this phone!") + app_name = config.get_other("app_name", "demo") + curr_path = Path(__file__).parent + data_dir = curr_path.joinpath("..", "output") + cur_datetime = datetime.fromtimestamp(int(time.time())).strftime("%Y-%m-%d_%H-%M-%S") + + """Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app, + run the learn first and then do the act stage or learn it during the action. + """ + if config.get_other("stage") == "learn" and config.get_other("mode") == "manual": + # choose ManualRecord and then run ParseRecord + # Remember, only run each action only one time, no need to run n_round. + self.set_actions([ManualRecord, ParseRecord]) + self.task_dir = data_dir.joinpath(app_name, f"manual_learn_{cur_datetime}") + self.docs_dir = data_dir.joinpath(app_name, "manual_docs") + elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto": + # choose SelfLearnAndReflect to run + self.set_actions([SelfLearnAndReflect]) + self.task_dir = data_dir.joinpath(app_name, f"auto_learn_{cur_datetime}") + self.docs_dir = data_dir.joinpath(app_name, "auto_docs") + elif config.get_other("stage") == "act": + # choose ScreenshotParse to run + self.set_actions([ScreenshotParse]) + self.task_dir = data_dir.joinpath(app_name, f"act_{cur_datetime}") + if config.get_other("mode") == "manual": + self.docs_dir = data_dir.joinpath(app_name, "manual_docs") + else: + self.docs_dir = data_dir.joinpath(app_name, "auto_docs") + self._check_dir() + + self._set_react_mode(RoleReactMode.BY_ORDER) + + def _check_dir(self): + self.task_dir.mkdir(parents=True, exist_ok=True) + self.docs_dir.mkdir(parents=True, exist_ok=True) + + async def react(self) -> Message: + self.round_count += 1 + result = await super().react() + print(f"react result {result}") + return result + + async def _act(self) -> Message: + logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})") + todo = self.rc.todo + if isinstance(todo, ManualRecord): + resp = await todo.run(task_dir=self.task_dir, task_desc=self.task_desc, env=self.rc.env) + elif isinstance(todo, ParseRecord): + resp = await todo.run( + app_name=config.get_other("app_name", "demo"), + task_dir=self.task_dir, + docs_dir=self.docs_dir, + env=self.rc.env, + ) + elif isinstance(todo, SelfLearnAndReflect): + resp = await todo.run( + round_count=self.round_count, + task_desc=self.task_desc, + last_act=self.last_act, + task_dir=self.task_dir, + docs_dir=self.docs_dir, + env=self.rc.env, + ) + if resp.action_state == RunState.SUCCESS: + self.last_act = resp.data.get("last_act") + elif isinstance(todo, ScreenshotParse): + resp = await todo.run( + round_count=self.round_count, + task_desc=self.task_desc, + last_act=self.last_act, + task_dir=self.task_dir, + docs_dir=self.docs_dir, + grid_on=self.grid_on, + env=self.rc.env, + ) + if resp.action_state == RunState.SUCCESS: + logger.info(f"grid_on: {resp.data.get('grid_on')}") + self.grid_on = resp.data.get("grid_on") + msg = Message( + content=f"RoundCount: {self.round_count}", + role=self.profile, + cause_by=type(resp), + send_from=self.name, + send_to=self.name, + ) + # self.publish_message(msg) + self.rc.memory.add(msg) + return msg diff --git a/examples/andriod_assistant/run_assistant.py b/examples/andriod_assistant/run_assistant.py new file mode 100644 index 000000000..3d9ed5cfa --- /dev/null +++ b/examples/andriod_assistant/run_assistant.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the entry of android assistant including learning and acting stage + +import asyncio +from pathlib import Path + +import typer + +from examples.andriod_assistant.roles.android_assistant import AndroidAssistant +from metagpt.config2 import config +from metagpt.environment.android_env.android_env import AndroidEnv +from metagpt.team import Team + +app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False) + + +@app.command("", help="Run a Android Assistant") +def startup( + task_desc: str = typer.Argument(help="the task description you want the android assistant to learn or act"), + n_round: int = typer.Option(default=20, help="The max round to do an app operation task."), + stage: str = typer.Option(default="learn", help="stage: learn / act"), + mode: str = typer.Option(default="auto", help="mode: auto / manual , when state=learn"), + app_name: str = typer.Option(default="demo", help="the name of app you want to run"), + investment: float = typer.Option(default=5.0, help="Dollar amount to invest in the AI company."), + refine_doc: bool = typer.Option( + default=False, help="Refine existing operation docs based on the latest observation if True." + ), + min_dist: int = typer.Option( + default=30, help="The minimum distance between elements to prevent overlapping during the labeling process." + ), + android_screenshot_dir: str = typer.Option( + default="/sdcard/Pictures/Screenshots", + help="The path to store screenshots on android device. Make sure it exists.", + ), + android_xml_dir: str = typer.Option( + default="/sdcard", + help="The path to store xml files for determining UI elements localtion. Make sure it exists.", + ), + device_id: str = typer.Option(default="emulator-5554", help="The Android device_id"), +): + config.set_other( + { + "stage": stage, + "mode": mode, + "app_name": app_name, + "task_desc": task_desc, + "refine_doc": refine_doc, + "min_dist": min_dist, + "android_screenshot_dir": android_screenshot_dir, + "android_xml_dir": android_xml_dir, + "device_id": device_id, + } + ) + + team = Team( + env=AndroidEnv( + device_id=device_id, + xml_dir=Path(android_xml_dir), + screenshot_dir=Path(android_screenshot_dir), + ) + ) + + team.hire([AndroidAssistant()]) + team.invest(investment) + team.run_project(idea=task_desc) + asyncio.run(team.run(n_round=n_round)) + + +if __name__ == "__main__": + app() diff --git a/examples/andriod_assistant/storage/android.view.ViewGroup_1067_236_android.widget.TextView_183_204_Apps_2.txt b/examples/andriod_assistant/storage/android.view.ViewGroup_1067_236_android.widget.TextView_183_204_Apps_2.txt new file mode 100644 index 000000000..19b670ea7 --- /dev/null +++ b/examples/andriod_assistant/storage/android.view.ViewGroup_1067_236_android.widget.TextView_183_204_Apps_2.txt @@ -0,0 +1 @@ +{'tap': '[CONTENT]\n{\n "Observation": "The first image shows a mobile device\'s home screen with various app icons and a Google search bar at the top. The second image displays an app drawer with a grid of apps and a search bar at the top, indicating that the UI element has been tapped.",\n "Thought": "Tapping the UI element opens the app drawer, which is a common function in mobile operating systems to access a list of all installed apps.",\n "Description": "Tapping this area will open the app drawer, displaying a list of all installed applications."\n}\n[/CONTENT]', 'text': '', 'v_swipe': '', 'h_swipe': '', 'long_press': ''} \ No newline at end of file diff --git a/examples/andriod_assistant/tests/test.py b/examples/andriod_assistant/tests/test.py new file mode 100644 index 000000000..c223665c4 --- /dev/null +++ b/examples/andriod_assistant/tests/test.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : test case (imgs from appagent's) + + +import ast +import asyncio +import re +from pathlib import Path + +from actions.parse_record_an import RECORD_PARSE_NODE +from prompts.operation_prompt import ( + long_press_doc_template, + refine_doc_suffix, + swipe_doc_template, + tap_doc_template, + text_doc_template, +) +from utils.schema import ActionOp, SwipeOp + +from metagpt.actions.action import Action +from metagpt.config2 import config +from metagpt.logs import logger +from metagpt.utils.common import encode_image + +TEST_BEFORE_PATH = Path("apps/demo_Contacts/labeled_screenshots/demo_Contacts_2024-01-30_21-50-19_1.png") +TEST_AFTER_PATH = Path("apps/demo_Contacts/labeled_screenshots/demo_Contacts_2024-01-30_21-50-19_2.png") +RECORD_PATH = Path("apps/demo_Contacts/record.txt") +TASK_DESC_PATH = Path("apps/demo_Contacts/task_desc.txt") +DOCS_DIR = Path("storage") + +testaction = Action(name="test") + + +# TODO test for parse record +# 仅使用一张图像进行测试 +async def manual_test(): + img_before_base64 = encode_image(TEST_BEFORE_PATH) + img_after_base64 = encode_image(TEST_AFTER_PATH) + + with open(RECORD_PATH, "r") as record_file: + rec = record_file.readline().strip() + action, resource_id = rec.split(":::") + action_type = action.split("(")[0] + action_param = re.findall(r"\((.*?)\)", action)[0] + if action_type == ActionOp.TAP.value: + prompt_template = tap_doc_template + context = prompt_template.format(ui_element=action_param) + elif action_type == ActionOp.TEXT.value: + input_area, input_text = action_param.split(":sep:") + prompt_template = text_doc_template + context = prompt_template.format(ui_element=input_area) + elif action_type == ActionOp.LONG_PRESS.value: + prompt_template = long_press_doc_template + context = prompt_template.format(ui_element=action_param) + elif action_type == ActionOp.SWIPE.value: + swipe_area, swipe_dir = action_param.split(":sep:") + if swipe_dir == SwipeOp.UP.value or swipe_dir == SwipeOp.DOWN.value: + action_type = ActionOp.VERTICAL_SWIPE.value + elif swipe_dir == SwipeOp.LEFT.value or swipe_dir == SwipeOp.RIGHT.value: + action_type = ActionOp.HORIZONTAL_SWIPE.value + prompt_template = swipe_doc_template + context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area) + else: + print("Error occurs") + + task_desc_path = TASK_DESC_PATH + task_desc = open(task_desc_path, "r").read() + context = context.format(task_desc=task_desc) + + doc_name = resource_id + ".txt" + + doc_path = DOCS_DIR.joinpath(doc_name) + if doc_path.exists(): + doc_content = ast.literal_eval(open(doc_path).read()) + if doc_content[action_type]: + if config.get_other("doc_refine"): + refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type]) + context += refine_context + logger.info( + f"Documentation for the element {resource_id} already exists. The doc will be " + f"refined based on the latest demo." + ) + else: + logger.info( + f"Documentation for the element {resource_id} already exists. Turn on DOC_REFINE " + f"in the config file if needed." + ) + else: + doc_content = {"tap": "", "text": "", "v_swipe": "", "h_swipe": "", "long_press": ""} + logger.info(f"Waiting for GPT-4V to generate documentation for the element {resource_id}") + + node = await RECORD_PARSE_NODE.fill( + context=context, llm=testaction.llm, images=[img_before_base64, img_after_base64] + ) + + # log_path = task_dir.joinpath(f"log_{app_name}_{demo_name}.txt") + node.compile(context=context, schema="json", mode="auto") + msg = node.content + doc_content[action_type] = msg + + with open(doc_path, "w") as outfile: + outfile.write(str(doc_content)) + logger.info(f"Documentation generated and saved to {doc_path}") + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(manual_test()) + loop.close() + print("OK") diff --git a/examples/andriod_assistant/tests/test_for_an.py b/examples/andriod_assistant/tests/test_for_an.py new file mode 100644 index 000000000..7dddaabf5 --- /dev/null +++ b/examples/andriod_assistant/tests/test_for_an.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : test on android emulator action. After Modify Role Test, this script is discarded. +import asyncio +import time +from pathlib import Path + +from actions.manual_record import ManualRecord +from actions.parse_record import ParseRecord +from actions.screenshot_parse import ScreenshotParse +from actions.self_learn_and_reflect import SelfLearnAndReflect + +from metagpt.environment.android_env.android_env import AndroidEnv + +TASK_PATH = Path("apps/Contacts") +DEMO_NAME = str(time.time()) +SELF_EXPLORE_DOC_PATH = TASK_PATH.joinpath("autodocs") +PARSE_RECORD_DOC_PATH = TASK_PATH.joinpath("demodocs") + +test_env_self_learn_android = AndroidEnv( + device_id="emulator-5554", + xml_dir=Path("/sdcard"), + screenshot_dir=Path("/sdcard/Pictures/Screenshots"), +) +test_self_learning = SelfLearnAndReflect() + +test_env_manual_learn_android = AndroidEnv( + device_id="emulator-5554", + xml_dir=Path("/sdcard"), + screenshot_dir=Path("/sdcard/Pictures/Screenshots"), +) +test_manual_record = ManualRecord() +test_manual_parse = ParseRecord() + +test_env_screenshot_parse_android = AndroidEnv( + device_id="emulator-5554", + xml_dir=Path("/sdcard"), + screenshot_dir=Path("/sdcard/Pictures/Screenshots"), +) +test_screenshot_parse = ScreenshotParse() + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + + test_action_list = [ + test_self_learning.run( + round_count=20, + task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", + last_act="", + task_dir=TASK_PATH / "demos" / f"self_learning_{DEMO_NAME}", + docs_dir=SELF_EXPLORE_DOC_PATH, + env=test_env_self_learn_android, + ), + test_manual_record.run( + # demo_name=DEMO_NAME, + task_dir=TASK_PATH / "demos" / f"manual_record_{DEMO_NAME}", + task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", + env=test_env_manual_learn_android, + ), + test_manual_parse.run( + app_name="Contacts", + # demo_name=DEMO_NAME, + task_dir=TASK_PATH / "demos" / f"manual_record_{DEMO_NAME}", # 修要修改 + docs_dir=PARSE_RECORD_DOC_PATH, # 需要修改 + env=test_env_manual_learn_android, + ), + test_screenshot_parse.run( + round_count=20, + task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ", + last_act="", + task_dir=TASK_PATH / f"act_{DEMO_NAME}", + docs_dir=PARSE_RECORD_DOC_PATH, + env=test_env_screenshot_parse_android, + grid_on=False, + ), + ] + + loop.run_until_complete(asyncio.gather(*test_action_list)) + loop.close() + print("Finish") diff --git a/examples/andriod_assistant/utils/schema.py b/examples/andriod_assistant/utils/schema.py new file mode 100644 index 000000000..d7990de40 --- /dev/null +++ b/examples/andriod_assistant/utils/schema.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +from enum import Enum + +from pydantic import BaseModel, Field, field_validator + + +class ActionOp(Enum): + TAP = "tap" + LONG_PRESS = "long_press" + TEXT = "text" + SWIPE = "swipe" + VERTICAL_SWIPE = "v_swipe" + HORIZONTAL_SWIPE = "h_swipe" + GRID = "grid" + STOP = "stop" + + +class SwipeOp(Enum): + UP = "up" + DOWN = "down" + LEFT = "left" + RIGHT = "right" + + +class Decision(Enum): + BACK = "BACK" + INEFFECTIVE = "INEFFECTIVE" + CONTINUE = "CONTINUE" + SUCCESS = "SUCCESS" + + @classmethod + def values(cls): + return [item.value for item in cls] + + +class AndroidElement(BaseModel): + """UI Element""" + + uid: str = Field(default="") + bbox: tuple[tuple[int, int], tuple[int, int]] = Field(default={}) + attrib: str = Field(default="") + + +class OpLogItem(BaseModel): + """log content for self-learn or task act""" + + step: int = Field(default=0) + prompt: str = Field(default="") + image: str = Field(default="") + response: str = Field(default="") + + +class ReflectLogItem(BaseModel): + """log content for self-learn-reflect""" + + step: int = Field(default=0) + prompt: str = Field(default="") + image_before: str = Field(default="") + image_after: str = Field(default="") + response: str = Field(default="") + + +class RecordLogItem(BaseModel): + """log content for record parse, same as ReflectLogItem""" + + step: int = Field(default=0) + prompt: str = Field(default="") + image_before: str = Field(default="") + image_after: str = Field(default="") + response: str = Field(default="") + + +class DocContent(BaseModel): + tap: str = Field(default="") + text: str = Field(default="") + v_swipe: str = Field(default="") + h_swipe: str = Field(default="") + long_press: str = Field(default="") + + +# start =================== define different Action Op and its params ============= +class RunState(Enum): + """run state""" + + SUCCESS = "success" + FINISH = "finish" + FAIL = "fail" + + +class BaseOpParam(BaseModel): + act_name: str = Field(default="", validate_default=True) + last_act: str = Field(default="") + param_state: RunState = Field(default=RunState.SUCCESS, description="return state when extract params") + + +class TapOp(BaseOpParam): + area: int = Field(default=-1) + + +class TextOp(BaseOpParam): + input_str: str = Field(default="") + + +class LongPressOp(BaseOpParam): + area: int = Field(default=-1) + + +# Modify This SwipeOp to SwipeOp_3, Need better name +class SwipeOp_3(BaseOpParam): + area: int = Field(default=-1) + swipe_orient: str = Field(default="up") + dist: str = Field(default="") + + +class GridOp(BaseModel): + act_name: str = Field(default="") + + +class BaseGridOpParam(BaseOpParam): + @field_validator("act_name", mode="before") + @classmethod + def check_act_name(cls, act_name: str) -> str: + return f"{act_name}_grid" + + +class TapGridOp(BaseGridOpParam): + area: int = Field(default=-1) + subarea: str = Field(default="") + + +class LongPressGridOp(BaseGridOpParam): + area: int = Field(default=-1) + subarea: str = Field(default="") + + +class SwipeGridOp(BaseGridOpParam): + start_area: int = Field(default=-1) + start_subarea: str = Field(default="") + end_area: int = Field(default=-1) + end_subarea: str = Field(default="") + + +# end =================== define different Action Op and its params ============= + + +class ReflectOp(BaseModel): + decision: str = "" + thought: str = "" + documentation: str = "" + param_state: RunState = RunState.SUCCESS + + +class AndroidActionOutput(BaseModel): + data: dict = Field(default=dict()) + action_state: RunState = Field(default=RunState.SUCCESS) diff --git a/examples/andriod_assistant/utils/utils.py b/examples/andriod_assistant/utils/utils.py new file mode 100644 index 000000000..b53df55be --- /dev/null +++ b/examples/andriod_assistant/utils/utils.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : + +import re +from pathlib import Path +from typing import Union +from xml.etree.ElementTree import Element, iterparse + +import cv2 +import pyshine as ps + +from examples.andriod_assistant.utils.schema import ( + ActionOp, + AndroidElement, + BaseGridOpParam, + BaseOpParam, + Decision, + GridOp, + LongPressGridOp, + LongPressOp, + ReflectOp, + RunState, + SwipeGridOp, + SwipeOp_3, + TapGridOp, + TapOp, + TextOp, +) +from metagpt.logs import logger + + +def get_id_from_element(elem: Element) -> str: + bounds = elem.attrib["bounds"][1:-1].split("][") + x1, y1 = map(int, bounds[0].split(",")) + x2, y2 = map(int, bounds[1].split(",")) + elem_w, elem_h = x2 - x1, y2 - y1 + if "resource-id" in elem.attrib and elem.attrib["resource-id"]: + elem_id = elem.attrib["resource-id"].replace(":", ".").replace("/", "_") + else: + elem_id = f"{elem.attrib['class']}_{elem_w}_{elem_h}" + if "content-desc" in elem.attrib and elem.attrib["content-desc"] and len(elem.attrib["content-desc"]) < 20: + content_desc = elem.attrib["content-desc"].replace("/", "_").replace(" ", "").replace(":", "_") + elem_id += f"_{content_desc}" + return elem_id + + +def traverse_xml_tree(xml_path: Path, elem_list: list[AndroidElement], attrib: str, add_index=False): + path = [] + for event, elem in iterparse(str(xml_path), ["start", "end"]): + if event == "start": + path.append(elem) + if attrib in elem.attrib and elem.attrib[attrib] == "true": + parent_prefix = "" + if len(path) > 1: + parent_prefix = get_id_from_element(path[-2]) + bounds = elem.attrib["bounds"][1:-1].split("][") + x1, y1 = map(int, bounds[0].split(",")) + x2, y2 = map(int, bounds[1].split(",")) + center = (x1 + x2) // 2, (y1 + y2) // 2 + elem_id = get_id_from_element(elem) + if parent_prefix: + elem_id = parent_prefix + "_" + elem_id + if add_index: + elem_id += f"_{elem.attrib['index']}" + close = False + for e in elem_list: + bbox = e.bbox + center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2 + dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5 + # TODO Modify config to default 30. It should be modified back config after single action test + # if dist <= config.get_other("min_dist"): + if dist <= 30: + close = True + break + if not close: + elem_list.append(AndroidElement(uid=elem_id, bbox=((x1, y1), (x2, y2)), attrib=attrib)) + + if event == "end": + path.pop() + + +def draw_bbox_multi( + img_path: Path, + output_path: Path, + elem_list: list[AndroidElement], + record_mode: bool = False, + dark_mode: bool = False, +): + imgcv = cv2.imread(str(img_path)) + count = 1 + for elem in elem_list: + try: + top_left = elem.bbox[0] + bottom_right = elem.bbox[1] + left, top = top_left[0], top_left[1] + right, bottom = bottom_right[0], bottom_right[1] + label = str(count) + if record_mode: + if elem.attrib == "clickable": + color = (250, 0, 0) + elif elem.attrib == "focusable": + color = (0, 0, 250) + else: + color = (0, 250, 0) + imgcv = ps.putBText( + imgcv, + label, + text_offset_x=(left + right) // 2 + 10, + text_offset_y=(top + bottom) // 2 + 10, + vspace=10, + hspace=10, + font_scale=1, + thickness=2, + background_RGB=color, + text_RGB=(255, 250, 250), + alpha=0.5, + ) + else: + text_color = (10, 10, 10) if dark_mode else (255, 250, 250) + bg_color = (255, 250, 250) if dark_mode else (10, 10, 10) + imgcv = ps.putBText( + imgcv, + label, + text_offset_x=(left + right) // 2 + 10, + text_offset_y=(top + bottom) // 2 + 10, + vspace=10, + hspace=10, + font_scale=1, + thickness=2, + background_RGB=bg_color, + text_RGB=text_color, + alpha=0.5, + ) + except Exception as e: + logger.error(f"ERROR: An exception occurs while labeling the image\n{e}") + count += 1 + cv2.imwrite(str(output_path), imgcv) + return imgcv + + +def draw_grid(img_path: Path, output_path: Path) -> tuple[int, int]: + def get_unit_len(n): + for i in range(1, n + 1): + if n % i == 0 and 120 <= i <= 180: + return i + return -1 + + image = cv2.imread(str(img_path)) + height, width, _ = image.shape + color = (255, 116, 113) + unit_height = get_unit_len(height) + if unit_height < 0: + unit_height = 120 + unit_width = get_unit_len(width) + if unit_width < 0: + unit_width = 120 + thick = int(unit_width // 50) + rows = height // unit_height + cols = width // unit_width + for i in range(rows): + for j in range(cols): + label = i * cols + j + 1 + left = int(j * unit_width) + top = int(i * unit_height) + right = int((j + 1) * unit_width) + bottom = int((i + 1) * unit_height) + cv2.rectangle(image, (left, top), (right, bottom), color, thick // 2) + cv2.putText( + image, + str(label), + (left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3), + 0, + int(0.01 * unit_width), + (0, 0, 0), + thick, + ) + cv2.putText( + image, + str(label), + (left + int(unit_width * 0.05), top + int(unit_height * 0.3)), + 0, + int(0.01 * unit_width), + color, + thick, + ) + cv2.imwrite(str(output_path), image) + return rows, cols + + +def area_to_xy(area: int, subarea: str, width: int, height: int, rows: int, cols: int) -> tuple[int, int]: + area -= 1 + logger.info(f"{cols}") + row, col = area // cols, area % cols + x_0, y_0 = col * (width // cols), row * (height // rows) + if subarea == "top-left": + x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) // 4 + elif subarea == "top": + x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) // 4 + elif subarea == "top-right": + x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) // 4 + elif subarea == "left": + x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) // 2 + elif subarea == "right": + x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) // 2 + elif subarea == "bottom-left": + x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) * 3 // 4 + elif subarea == "bottom": + x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) * 3 // 4 + elif subarea == "bottom-right": + x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) * 3 // 4 + else: + x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) // 2 + return x, y + + +def elem_bbox_to_xy(bbox: tuple[tuple[int, int], tuple[int, int]]) -> tuple[int, int]: + tl, br = bbox + x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2 + return x, y + + +def reflect_parse_extarct(parsed_json: dict) -> ReflectOp: + decision = parsed_json.get("Decision") + if decision not in Decision.values(): + op = ReflectOp(param_state=RunState.FAIL) + else: + op = ReflectOp( + decision=parsed_json.get("Decision"), + thought=parsed_json.get("Thought"), + documentation=parsed_json.get("Documentation"), + ) + return op + + +def screenshot_parse_extract(parsed_json: dict, grid_on: bool = False) -> Union[BaseOpParam, BaseGridOpParam, GridOp]: + act = parsed_json.get("Action") + last_act = parsed_json.get("Summary") + act_name = act.split("(")[0] + + if RunState.FINISH.value.upper() in act: + return BaseOpParam(param_state=RunState.FINISH) + + if grid_on: + return screenshot_parse_extract_with_grid(act_name, act, last_act) + else: + return screenshot_parse_extract_without_grid(act_name, act, last_act) + + +def op_params_clean(params: list[str]) -> list[Union[int, str]]: + param_values = [] + for param_value in params: + if '"' in param_value or "'" in param_value: # remove `"` + param_values.append(param_value.strip()[1:-1]) + else: + param_values.append(int(param_value)) + return param_values + + +def screenshot_parse_extract_without_grid(act_name: str, act: str, last_act: str) -> Union[BaseOpParam, GridOp]: + if act_name == ActionOp.TAP.value: + area = int(re.findall(r"tap\((.*?)\)", act)[0]) + op = TapOp(act_name=act_name, area=area, last_act=last_act) + elif act_name == ActionOp.TEXT.value: + input_str = re.findall(r"text\((.*?)\)", act)[0][1:-1] + op = TextOp(act_name=act_name, input_str=input_str, last_act=last_act) + elif act_name == ActionOp.LONG_PRESS.value: + area = int(re.findall(r"long_press\((.*?)\)", act)[0]) + op = LongPressOp(act_name=act_name, area=area, last_act=last_act) + elif act_name == ActionOp.SWIPE.value: + params = re.findall(r"swipe\((.*?)\)", act)[0].split(",") + params = op_params_clean(params) # area, swipe_orient, dist + op = SwipeOp_3(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act) + elif act_name == ActionOp.GRID.value: + op = GridOp(act_name=act_name) + else: + op = BaseOpParam(param_state=RunState.FAIL) + return op + + +def screenshot_parse_extract_with_grid(act_name: str, act: str, last_act: str) -> Union[BaseGridOpParam, GridOp]: + if act_name == ActionOp.TAP.value: + params = re.findall(r"tap\((.*?)\)", act)[0].split(",") + params = op_params_clean(params) + op = TapGridOp(act_name=act_name, area=params[0], subarea=params[1], last_act=last_act) + elif act_name == ActionOp.LONG_PRESS.value: + params = re.findall(r"long_press\((.*?)\)", act)[0].split(",") + params = op_params_clean(params) + op = LongPressGridOp(act_name=act_name, area=params[0], subarea=params[1], last_act=last_act) + elif act_name == ActionOp.SWIPE.value: + params = re.findall(r"swipe\((.*?)\)", act)[0].split(",") + params = op_params_clean(params) + op = SwipeGridOp( + act_name=act_name, start_area=params[0], start_subarea=params[1], end_area=params[2], end_subarea=params[3] + ) + elif act_name == ActionOp.GRID.value: + op = GridOp(act_name=act_name) + else: + op = BaseGridOpParam(param_state=RunState.FAIL) + return op diff --git a/examples/llm_hello_world.py b/examples/llm_hello_world.py index 62fc2ed68..8321b64ca 100644 --- a/examples/llm_hello_world.py +++ b/examples/llm_hello_world.py @@ -6,9 +6,11 @@ @File : llm_hello_world.py """ import asyncio +from pathlib import Path from metagpt.llm import LLM from metagpt.logs import logger +from metagpt.utils.common import encode_image async def main(): @@ -38,6 +40,15 @@ async def main(): if hasattr(llm, "completion"): logger.info(llm.completion(hello_msg)) + # check llm-vision capacity if it supports + invoice_path = Path(__file__).parent.joinpath("..", "tests", "data", "invoices", "invoice-2.png") + img_base64 = encode_image(invoice_path) + try: + res = await llm.aask(msg="if this is a invoice, just return True else return False", images=[img_base64]) + assert "true" in res.lower() + except Exception: + pass + if __name__ == "__main__": asyncio.run(main()) diff --git a/metagpt/actions/action_node.py b/metagpt/actions/action_node.py index 27dde5a8c..4563ec150 100644 --- a/metagpt/actions/action_node.py +++ b/metagpt/actions/action_node.py @@ -453,7 +453,6 @@ class ActionNode: self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=USE_CONFIG_TIMEOUT, exclude=None ): prompt = self.compile(context=self.context, schema=schema, mode=mode, exclude=exclude) - if schema != "raw": mapping = self.get_mapping(mode, exclude=exclude) class_name = f"{self.key}_AN" @@ -464,6 +463,7 @@ class ActionNode: self.instruct_content = scontent else: self.content = await self.llm.aask(prompt) + logger.info(self.content) self.instruct_content = None return self diff --git a/metagpt/config2.py b/metagpt/config2.py index f3273419f..eb4c38368 100644 --- a/metagpt/config2.py +++ b/metagpt/config2.py @@ -75,6 +75,7 @@ class Config(CLIParams, YamlModel): iflytek_api_key: str = "" azure_tts_subscription_key: str = "" azure_tts_region: str = "" + other: dict = dict() # other dict @classmethod def from_home(cls, path): @@ -127,6 +128,15 @@ class Config(CLIParams, YamlModel): self.reqa_file = reqa_file self.max_auto_summarize_code = max_auto_summarize_code + def set_other(self, other: dict): + self.other = other + + def get_other(self, key: str, default_value: str = None): + if default_value is None: + return self.other.get(key) + else: + return self.other.get(key, default_value) + def get_openai_llm(self) -> Optional[LLMConfig]: """Get OpenAI LLMConfig by name. If no OpenAI, raise Exception""" if self.llm.api_type == LLMType.OPENAI: diff --git a/metagpt/environment/android_env/android_ext_env.py b/metagpt/environment/android_env/android_ext_env.py index 5c3363655..2c2771075 100644 --- a/metagpt/environment/android_env/android_ext_env.py +++ b/metagpt/environment/android_env/android_ext_env.py @@ -101,6 +101,7 @@ class AndroidExtEnv(ExtEnv): return f"adb -s {self.device_id} " def execute_adb_with_cmd(self, adb_cmd: str) -> str: + adb_cmd = adb_cmd.replace("\\", "/") res = subprocess.run(adb_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) exec_res = ADB_EXEC_FAIL if not res.returncode: diff --git a/metagpt/environment/api/env_api.py b/metagpt/environment/api/env_api.py index 1e6df544d..924f6b104 100644 --- a/metagpt/environment/api/env_api.py +++ b/metagpt/environment/api/env_api.py @@ -18,11 +18,11 @@ class EnvAPIAbstract(BaseModel): class EnvAPIRegistry(BaseModel): """the registry to store environment w&r api/interface""" - registry: dict[str, dict[str, Union[dict, Any, str]]] = Field(default=dict(), exclude=True) + registry: dict[str, Callable] = Field(default=dict(), exclude=True) def get(self, api_name: str): if api_name not in self.registry: - raise ValueError + raise KeyError(f"api_name: {api_name} not found") return self.registry.get(api_name) def __getitem__(self, api_name: str) -> Callable: diff --git a/metagpt/environment/api/read_api.py b/metagpt/environment/api/read_api.py new file mode 100644 index 000000000..7b0076ce7 --- /dev/null +++ b/metagpt/environment/api/read_api.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the readable api/interface abstraction to integrate with environment diff --git a/metagpt/environment/api/write_api.py b/metagpt/environment/api/write_api.py new file mode 100644 index 000000000..de6665592 --- /dev/null +++ b/metagpt/environment/api/write_api.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : the writable api/interface abstraction to integrate with environment diff --git a/metagpt/environment/gym_env/__init__.py b/metagpt/environment/gym_env/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/metagpt/environment/gym_env/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/metagpt/environment/gym_env/gym_env.py b/metagpt/environment/gym_env/gym_env.py new file mode 100644 index 000000000..b83d988d6 --- /dev/null +++ b/metagpt/environment/gym_env/gym_env.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : MG Gym Env + + +class GymEnv: + pass diff --git a/metagpt/team.py b/metagpt/team.py index 35f987b57..beb1d6186 100644 --- a/metagpt/team.py +++ b/metagpt/team.py @@ -76,6 +76,7 @@ class Team(BaseModel): def hire(self, roles: list[Role]): """Hire roles to cooperate""" + roles[0] self.env.add_roles(roles) @property diff --git a/tests/metagpt/environment/gym_env/__init__.py b/tests/metagpt/environment/gym_env/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/tests/metagpt/environment/gym_env/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc : diff --git a/tests/metagpt/environment/software_env/__init__.py b/tests/metagpt/environment/software_env/__init__.py new file mode 100644 index 000000000..2bcf8efd0 --- /dev/null +++ b/tests/metagpt/environment/software_env/__init__.py @@ -0,0 +1,3 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# @Desc :