add android agent

This commit is contained in:
better629 2024-03-27 12:13:28 +08:00
commit 9d3571469c
33 changed files with 2076 additions and 3 deletions

View file

@ -0,0 +1,71 @@
# The Android Assisant
The Android assistant can learn from your daily operations or automatically learn, and perform App operations according to your instructions, thereby realizing any of your needs on the phone and freeing up your hands.
## Install
### Device Simulator
1. Firstly, install ADB on the PC, which enables your PC to interact with Android devices
2. Connect the Android device to the computer's USB port
3. If you do not have an Android device, you can download Android Studio and use its Android emulator to carry out the subsequent operations. The steps to install the Android emulator can be found here:[快速安装Android Studio & Simulator](https://dev.weixin.qq.com/docs/framework/dev/framework/env/android-simulator.html)
### Install Requirments
You can run the following command line:
```bash
pip install -r requirements.txt
```
## Experiential Learning
By designating the app to explore and the method of learning (automatic or manual demonstration), you can facilitate Android Assistant to master the functions of various apps, thereby generating respective documentation for later use during the phase termed as "Automation of routine tasks". For any given task objective, conducting approximately 20 cycles of exploration can considerably enhance the performance. You can experiment with both the automatic learning and manual demonstration modes for the "contacts" app by implementing the ensuing commands:
```bash
python run_assistant.py "your task description" --stage "learn" --mode "auto/manual" --app-name "Contacts"
```
## Free Your Hands
Once the Android Assistant has completed ample exploration, you are all set to automate your tasks! By utilizing either text description or voice input, you can instruct the Android Assistant to perform the desired tasks across various applications. For the specific command processes, please see the following recommendations:
### By Text
```bash
python run_assistant.py "your task description" --stage "act" --mode "auto/manual" --app-name "app names"
```
### By Voice
coming soon
## Run It
You can run Android Assisant by running the following command line:
```bash
python run_assistant.py "your task description" --stage "your choice(learn/act)" --mode "your choice(auto/manual)" --app-name "app name"
```
And the specific parameters are as follows:
```text
Usage: run_assistant.py [OPTIONS] TASK_DESC
Run a Android Assistant
Arguments:
TASK_DESC the task description you want the android assistant to learn or
act [required]
Options:
--n-round INTEGER The max round to do an app operation task.
[default: 20]
--stage TEXT stage: learn / act [default: learn]
--mode TEXT mode: auto / manual , when state=learn
[default: auto]
--app-name TEXT the name of app you want to run [default:
demo]
--investment FLOAT Dollar amount to invest in the AI company.
[default: 5.0]
--refine-doc / --no-refine-doc Refine existing operation docs based on the
latest observation if True. [default: no-
refine-doc]
--min-dist INTEGER The minimum distance between elements to
prevent overlapping during the labeling
process. [default: 30]
--android-screenshot-dir TEXT The path to store screenshots on android
device. Make sure it exists. [default:
/sdcard/Pictures/Screenshots]
--android-xml-dir TEXT The path to store xml files for determining
UI elements localtion. Make sure it exists.
[default: /sdcard]
--device-id TEXT The Android device_id [default:
emulator-5554]
--help Show this message and exit.
```

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,193 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : manual record user interaction in stage=learn & mode=manual, LIKE scripts/step_recorder.py
import time
from pathlib import Path
import cv2
from examples.andriod_assistant.utils.schema import (
ActionOp,
AndroidActionOutput,
RunState,
SwipeOp,
)
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.const import ADB_EXEC_FAIL
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.logs import logger
class ManualRecord(Action):
"""do a human operation on the screen with human input"""
name: str = "ManualRecord"
useless_list: list[str] = [] # store useless elements uid
record_path: Path = ""
task_desc_path: Path = ""
screenshot_before_path: Path = ""
screenshot_after_path: Path = ""
xml_path: Path = ""
# async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv):
async def run(self, task_desc: str, task_dir: Path, env: AndroidEnv):
self.record_path = Path(task_dir) / "record.txt"
self.task_desc_path = Path(task_dir) / "task_desc.txt"
self.screenshot_before_path = Path(task_dir) / "raw_screenshots"
self.screenshot_after_path = Path(task_dir) / "labeled_screenshots"
self.xml_path = Path(task_dir) / "xml"
for path in [self.screenshot_before_path, self.screenshot_after_path, self.xml_path]:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
with open(self.record_path, "w") as file:
file.write("")
record_file = open(self.record_path, "w")
with open(self.task_desc_path, "w") as f:
f.write(task_desc)
step = 0
while True:
step += 1
screenshot_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_screenshot",
# kwargs={"ss_name": f"{demo_name}_{step}", "local_save_dir": self.screenshot_before_path}
kwargs={"ss_name": f"{step}", "local_save_dir": self.screenshot_before_path},
)
)
xml_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_xml",
# kwargs={"xml_name": f"{demo_name}_{step}", "local_save_dir": self.xml_path}
kwargs={"xml_name": f"{step}", "local_save_dir": self.xml_path},
)
)
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{step}_labeled.png")
# screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{demo_name}_{step}_labeled.png")
labeled_img = draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list)
cv2.imshow("image", labeled_img)
cv2.waitKey(0)
cv2.destroyAllWindows()
user_input = "xxx"
logger.info(
"Choose one of the following actions you want to perform on the current screen:\ntap, text, long_press,"
"swipe, stop",
"blue",
)
while (
user_input.lower() != ActionOp.TAP.value
and user_input.lower() != ActionOp.TEXT.value
and user_input.lower() != ActionOp.LONG_PRESS.value
and user_input.lower() != ActionOp.SWIPE.value
and user_input.lower() != ActionOp.STOP.value
):
user_input = input()
if user_input.lower() == ActionOp.TAP.value:
logger.info(
f"Which element do you want to tap? Choose a numeric tag from 1 to {len(elem_list)}:", "blue"
)
user_input = "xxx"
while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1:
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
ret = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y}))
if ret == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"tap({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.TEXT.value:
logger.info(
f"Which element do you want to input the text string? Choose a numeric tag from 1 to "
f"{len(elem_list)}:",
"blue",
)
input_area = "xxx"
while not input_area.isnumeric() or int(input_area) > len(elem_list) or int(input_area) < 1:
input_area = input()
logger.info("Enter your input text below:", "blue")
user_input = ""
while not user_input:
user_input = input()
await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": user_input}))
record_file.write(f'text({input_area}:sep:"{user_input}"):::{elem_list[int(input_area) - 1].uid}\n')
elif user_input.lower() == ActionOp.LONG_PRESS.value:
logger.info(
f"Which element do you want to long press? Choose a numeric tag from 1 to {len(elem_list)}:", "blue"
)
user_input = "xxx"
while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1:
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
ret = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
if ret == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"long_press({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.SWIPE.value:
logger.info(
"What is the direction of your swipe? Choose one from the following options:\nup, down, left,"
" right",
"blue",
)
user_input = ""
while (
user_input != SwipeOp.UP.value
and user_input != SwipeOp.DOWN.value
and user_input != SwipeOp.LEFT.value
and user_input != SwipeOp.RIGHT.value
):
user_input = input()
swipe_dir = user_input
logger.info(f"Which element do you want to swipe? Choose a numeric tag from 1 to {len(elem_list)}:")
while not user_input.isnumeric() or int(user_input) > len(elem_list) or int(user_input) < 1:
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
ret = await env.step(
EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir})
)
if ret == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"swipe({int(user_input)}:sep:{swipe_dir}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.STOP.value:
record_file.write("stop\n")
record_file.close()
break
else:
break
time.sleep(3)

View file

@ -0,0 +1,141 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : parse record to generate learned standard operations in stage=learn & mode=manual,
# LIKE scripts/document_generation.py
import ast
import json
import re
from pathlib import Path
from examples.andriod_assistant.actions.parse_record_an import RECORD_PARSE_NODE
from examples.andriod_assistant.prompts.operation_prompt import (
long_press_doc_template,
refine_doc_suffix,
swipe_doc_template,
tap_doc_template,
text_doc_template,
)
from examples.andriod_assistant.utils.schema import (
ActionOp,
AndroidActionOutput,
RecordLogItem,
RunState,
SwipeOp,
)
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.logs import logger
from metagpt.utils.common import encode_image
class ParseRecord(Action):
name: str = "ParseRecord"
record_path: Path = ""
task_desc_path: Path = ""
screenshot_before_path: Path = ""
screenshot_after_path: Path = ""
# async def run(self, app_name: str, demo_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
async def run(self, app_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
docs_dir.mkdir(parents=True, exist_ok=True)
doc_count = 0
self.record_path = Path(task_dir) / "record.txt"
self.task_desc_path = Path(task_dir) / "task_desc.txt"
self.screenshot_before_path = Path(task_dir) / "raw_screenshots"
self.screenshot_after_path = Path(task_dir) / "labeled_screenshots"
with open(self.record_path, "r") as record_file:
record_step_count = len(record_file.readlines()) - 1
record_file.seek(0)
for step in range(1, record_step_count + 1):
# img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step}_labeled.png"))
# img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step + 1}_labeled.png"))
img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step}_labeled.png"))
img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step + 1}_labeled.png"))
rec = record_file.readline().strip()
action, resource_id = rec.split(":::")
action_type = action.split("(")[0]
# 构建Prompt
action_param = re.findall(r"\((.*?)\)", action)[0]
if action_type == ActionOp.TAP.value:
prompt_template = tap_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.TEXT.value:
input_area, input_text = action_param.split(":sep:")
prompt_template = text_doc_template
context = prompt_template.format(ui_element=input_area)
elif action_type == ActionOp.LONG_PRESS.value:
prompt_template = long_press_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.SWIPE.value:
swipe_area, swipe_dir = action_param.split(":sep:")
if swipe_dir == SwipeOp.UP.value or swipe_dir == SwipeOp.DOWN.value:
action_type = ActionOp.VERTICAL_SWIPE.value
elif swipe_dir == SwipeOp.LEFT.value or swipe_dir == SwipeOp.RIGHT.value:
action_type = ActionOp.HORIZONTAL_SWIPE.value
prompt_template = swipe_doc_template
context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area)
else:
break
task_desc_path = task_dir.joinpath("task_desc.txt")
task_desc = open(task_desc_path, "r").read()
context = context.format(task_desc=task_desc)
doc_name = resource_id + ".txt"
doc_path = docs_dir.joinpath(doc_name)
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[action_type]:
if config.get_other("doc_refine"):
refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type])
context += refine_context
logger.info(
f"Documentation for the element {resource_id} already exists. The doc will be "
f"refined based on the latest demo."
)
else:
logger.info(
f"Documentation for the element {resource_id} already exists. Turn on DOC_REFINE "
f"in the config file if needed."
)
continue
else:
doc_content = {"tap": "", "text": "", "v_swipe": "", "h_swipe": "", "long_press": ""}
logger.info(f"Waiting for GPT-4V to generate documentation for the element {resource_id}")
node = await RECORD_PARSE_NODE.fill(
context=context, llm=self.llm, images=[img_before_base64, img_after_base64]
)
if "error" in node.content:
return AndroidActionOutput(action_state=RunState.FAIL)
# log_path = task_dir.joinpath(f"log_{app_name}_{demo_name}.txt")
log_path = task_dir.joinpath(f"log_{app_name}.txt")
prompt = node.compile(context=context, schema="json", mode="auto")
msg = node.content
doc_content[action_type] = msg
with open(log_path, "a") as logfile:
log_item = RecordLogItem(
step=step,
prompt=prompt,
image_before=img_before_base64,
image_after=img_after_base64,
response=node.content,
)
logfile.write(json.dumps(log_item.model_dump()) + "\n")
with open(doc_path, "w") as outfile:
outfile.write(str(doc_content))
doc_count += 1
logger.info(f"Documentation generated and saved to {doc_path}")
# TODO MetaGPT 里面的Config 需要看一下
# time.sleep(config.get_other("request_interval"))
logger.info(f"Documentation generation phase completed. {doc_count} docs generated.")
# TODO
# 1. LOG中记录方式有问题需要把IMG的部分拿出去丢掉

View file

@ -0,0 +1,32 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the ActionNode to parse record
from metagpt.actions.action_node import ActionNode
OBSERVATION = ActionNode(
key="Observation",
expected_type=str,
instruction="Provide a description of your observations of the two images. "
"Subsequently, delineate the distinctions between the first image and the second one.",
example="",
)
THOUGHT = ActionNode(
key="Thought",
expected_type=str,
instruction="Consider the impact of Action acting on UI elements.",
example="",
)
DESCRIPTION = ActionNode(
key="Description",
expected_type=str,
instruction="Describe the functionality of the UI element concisely in one or two sentences Do not include "
"the numeric tag in your description",
example="",
)
NODES = [OBSERVATION, THOUGHT, DESCRIPTION]
RECORD_PARSE_NODE = ActionNode.from_children("RecordParse", NODES)

View file

@ -0,0 +1,204 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : LIKE scripts/task_executor.py in stage=act
import ast
from pathlib import Path
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_template,
screenshot_parse_with_grid_template,
)
from examples.andriod_assistant.utils.schema import (
AndroidActionOutput,
AndroidElement,
GridOp,
LongPressGridOp,
LongPressOp,
OpLogItem,
RunState,
SwipeGridOp,
SwipeOp_3,
TapGridOp,
TapOp,
TextOp,
)
from examples.andriod_assistant.utils.utils import (
area_to_xy,
draw_bbox_multi,
draw_grid,
elem_bbox_to_xy,
screenshot_parse_extract,
traverse_xml_tree,
)
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.const import ADB_EXEC_FAIL
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.utils.common import encode_image
class ScreenshotParse(Action):
name: str = "ScreenshotParse"
def _makeup_ui_document(self, elem_list: list[AndroidElement], docs_idr: Path, use_exist_doc: bool = True) -> str:
if not use_exist_doc:
return ""
ui_doc = """
You also have access to the following documentations that describes the functionalities of UI
elements you can interact on the screen. These docs are crucial for you to determine the target of your
next action. You should always prioritize these documented elements for interaction:"""
for i, elem in enumerate(elem_list):
doc_path = docs_idr.joinpath(f"{elem.uid}.txt")
if not doc_path.exists():
continue
ui_doc += f"Documentation of UI element labeled with the numeric tag '{i + 1}':\n"
doc_content = ast.literal_eval(open(doc_path, "r").read())
if doc_content["tap"]:
ui_doc += f"This UI element is clickable. {doc_content['tap']}\n\n"
if doc_content["text"]:
ui_doc += (
f"This UI element can receive text input. The text input is used for the following "
f"purposes: {doc_content['text']}\n\n"
)
if doc_content["long_press"]:
ui_doc += f"This UI element is long clickable. {doc_content['long_press']}\n\n"
if doc_content["v_swipe"]:
ui_doc += (
f"This element can be swiped directly without tapping. You can swipe vertically on "
f"this UI element. {doc_content['v_swipe']}\n\n"
)
if doc_content["h_swipe"]:
ui_doc += (
f"This element can be swiped directly without tapping. You can swipe horizontally on "
f"this UI element. {doc_content['h_swipe']}\n\n"
)
return ui_doc
async def run(
self,
round_count: int,
task_desc: str,
last_act: str,
task_dir: Path,
docs_dir: Path,
grid_on: bool,
env: AndroidEnv,
):
for path in [task_dir, docs_dir]:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
screenshot_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)
)
xml_path: Path = await env.observe(
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
width, height = env.device_shape
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list: list[AndroidElement] = clickable_list.copy()
for elem in focusable_list:
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
screenshot_labeled_path = task_dir.joinpath(f"{round_count}_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list)
img_base64 = encode_image(screenshot_labeled_path)
parse_template = screenshot_parse_with_grid_template if grid_on else screenshot_parse_template
if grid_on:
env.rows, env.cols = draw_grid(screenshot_path, task_dir / f"{round_count}_grid.png")
ui_doc = self._makeup_ui_document(elem_list, docs_dir)
context = parse_template.format(ui_document=ui_doc, task_description=task_desc, last_act=last_act)
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])
if "error" in node.content:
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
OpLogItem(step=round_count, prompt=prompt, image=str(screenshot_labeled_path), response=node.content)
op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on)
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
if isinstance(op_param, TapOp):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, TextOp):
res = await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": op_param.input_str}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, LongPressOp):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeOp_3):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(
EnvAPIAbstract(
api_name="user_swipe",
kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist},
)
)
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, GridOp):
grid_on = True
elif isinstance(op_param, TapGridOp) or isinstance(op_param, LongPressGridOp):
x, y = area_to_xy(op_param.area, op_param.subarea, env.width, env.height, env.rows, env.cols)
if isinstance(op_param, TapGridOp):
res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
else:
# LongPressGridOp
res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeGridOp):
start_x, start_y = area_to_xy(
op_param.start_area, op_param.start_subarea, env.width, env.height, env.rows, env.cols
)
end_x, end_y = area_to_xy(
op_param.end_area, op_param.end_subarea, env.width, env.height, env.rows, env.cols
)
res = await env.step(
EnvAPIAbstract(api_name="user_swipe_to", kwargs={"start": (start_x, start_y), "end": (end_x, end_y)})
)
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
if op_param.act_name != "grid":
grid_on = True
return AndroidActionOutput(data={"grid_on": grid_on})

View file

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the ActionNode to parse screenshot
from metagpt.actions.action_node import ActionNode
OBSERVATION = ActionNode(
key="Observation", expected_type=str, instruction="Describe what you observe in the image", example=""
)
THOUGHT = ActionNode(
key="Thought",
expected_type=str,
instruction="To complete the given task, what is the next step I should do",
example="",
)
ACTION = ActionNode(
key="Action",
expected_type=str,
instruction="The function call with the correct parameters to proceed with the task. If you believe the task is "
"completed or there is nothing to be done, you should output FINISH. You cannot output anything else "
"except a function call or FINISH in this field.",
example="",
)
SUMMARY = ActionNode(
key="Summary",
expected_type=str,
instruction="Summarize your past actions along with your latest action in one or two sentences. Do not include "
"the numeric tag in your summary",
example="",
)
SUMMARY_GRID = ActionNode(
key="Summary",
expected_type=str,
instruction="Summarize your past actions along with your latest action in one or two sentences. Do not include "
"the grid area number in your summary",
example="",
)
NODES = [OBSERVATION, THOUGHT, ACTION, SUMMARY]
NODES_GRID = [OBSERVATION, THOUGHT, ACTION, SUMMARY_GRID]
SCREENSHOT_PARSE_NODE = ActionNode.from_children("ScreenshotParse", NODES)
SCREENSHOT_PARSE_GRID_NODE = ActionNode.from_children("ScreenshotParseGrid", NODES_GRID)

View file

@ -0,0 +1,246 @@
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage
import ast
from pathlib import Path
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.actions.self_learn_reflect_an import (
SELF_LEARN_REFLECT_NODE,
)
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_reflect_template as reflect_template,
)
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_template,
)
from examples.andriod_assistant.utils.schema import (
ActionOp,
AndroidActionOutput,
AndroidElement,
Decision,
DocContent,
LongPressOp,
OpLogItem,
ReflectLogItem,
RunState,
SwipeOp,
SwipeOp_3,
TapOp,
TextOp,
)
from examples.andriod_assistant.utils.utils import (
draw_bbox_multi,
elem_bbox_to_xy,
reflect_parse_extarct,
screenshot_parse_extract,
traverse_xml_tree,
)
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.const import ADB_EXEC_FAIL
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.logs import logger
from metagpt.utils.common import encode_image
class SelfLearnAndReflect(Action):
name: str = "SelfLearnAndReflect"
useless_list: list[str] = [] # store useless elements uid
screenshot_before_path: str = ""
screenshot_before_base64: str = ""
elem_list: list[AndroidElement] = []
swipe_orient: str = "up"
act_name: str = ""
ui_area: int = -1
async def run(
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
for path in [task_dir, docs_dir]:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
return resp
async def run_self_learn(
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
screenshot_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)
)
xml_path: Path = await env.observe(
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
screenshot_before_labeled_path = task_dir.joinpath(f"{round_count}_before_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_before_labeled_path, elem_list)
img_base64 = encode_image(screenshot_before_labeled_path)
self.screenshot_before_base64 = img_base64
self.screenshot_before_path = screenshot_before_labeled_path
self_explore_template = screenshot_parse_self_explore_template
context = self_explore_template.format(task_description=task_desc, last_act=last_act)
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])
print(f"fill result:{node}")
if "error" in node.content:
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
# Modify WindowsPath to Str
OpLogItem(step=round_count, prompt=prompt, image=str(screenshot_before_labeled_path), response=node.content)
op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on=False)
# TODO Modify Op_param. When op_param.action is FINISH, how to solve this ?
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
if isinstance(op_param, TapOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, TextOp):
res = await env.step(EnvAPIAbstract(api_name="user_input", kwargs={"input_txt": op_param.input_str}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, LongPressOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeOp_3):
self.ui_area = op_param.area
self.swipe_orient = op_param.swipe_orient
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = await env.step(
EnvAPIAbstract(
api_name="user_swipe",
kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist},
)
)
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
self.elem_list = elem_list
self.act_name = op_param.act_name
return AndroidActionOutput()
async def run_reflect(
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
screenshot_path: Path = await env.observe(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_after", "local_save_dir": task_dir}
)
)
if not screenshot_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list)
img_base64 = encode_image(screenshot_after_labeled_path)
if self.act_name == ActionOp.TAP.value:
action = "tapping"
elif self.act_name == ActionOp.LONG_PRESS.value:
action = "long pressing"
elif self.act_name == ActionOp.SWIPE.value:
action = "swiping"
if self.swipe_orient == SwipeOp.UP.value or self.swipe_orient == SwipeOp.DOWN.value:
action = "v_swipe"
elif self.swipe_orient == SwipeOp.LEFT.value or self.swipe_orient == SwipeOp.RIGHT.value:
action = "h_swipe"
else:
# TODO Test for assignment, This error is eupiped with the next.
logger.info(f"Warning: current action name:{self.act_name}")
logger.info("Warning: act_name parse wrong!")
action = None
context = reflect_template.format(
action=action, ui_element=str(self.ui_area), task_desc=task_desc, last_act=last_act
)
node = await SELF_LEARN_REFLECT_NODE.fill(
context=context, llm=self.llm, images=[self.screenshot_before_base64, img_base64]
)
if "error" in node.content:
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
ReflectLogItem(
step=round_count,
prompt=prompt,
image_before=str(self.screenshot_before_path),
image_after=str(screenshot_after_labeled_path),
response=node.content,
)
op_param = reflect_parse_extarct(node.instruct_content.model_dump())
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
# TODO 这里经常出现错误
logger.info(f"Error 高发地区, 长度为{len(self.elem_list)}ui_erea为{self.ui_area}")
resource_id = self.elem_list[int(self.ui_area) - 1].uid
if op_param.decision == Decision.INEFFECTIVE.value:
self.useless_list.append(resource_id)
last_act = "NONE" # TODO global
elif op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value, Decision.SUCCESS.value]:
if op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value]:
self.useless_list.append(resource_id)
last_act = "NONE"
if op_param.decision == Decision.BACK.value:
res = await env.step(EnvAPIAbstract(api_name="system_back"))
if res == ADB_EXEC_FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
doc = op_param.documentation
doc_path = docs_dir.joinpath(f"{resource_id}.txt")
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[self.act_name]:
logger.info(f"Documentation for the element {resource_id} already exists.")
return AndroidActionOutput(action_state=RunState.FAIL)
else:
doc_content = DocContent()
setattr(doc_content, self.act_name, doc)
doc_path.write_text(str(doc_content))
return AndroidActionOutput(data={"last_act": last_act})
# TODO 如何处理 FINISH 状态这一点应该需要与role 联动才能解决

View file

@ -0,0 +1,21 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the ActionNode to parse Reflection
from metagpt.actions.action_node import ActionNode
DECISION = ActionNode(
key="Decision", expected_type=str, instruction="explain why you made this decision", example="BACK"
)
THOUGHT = ActionNode(key="Thought", expected_type=str, instruction="explain why you made this decision", example="")
DOCUMENTATION = ActionNode(
key="Documentation", expected_type=str, instruction="describe the function of the UI element", example=""
)
NODES = [DECISION, THOUGHT, DOCUMENTATION]
SELF_LEARN_REFLECT_NODE = ActionNode.from_children("SelfLearnReflect", NODES)

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,168 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the prompt templates of assistant learning and acting
screenshot_parse_template = """You are an agent that is trained to perform some basic tasks on a smartphone. You will be given a
smartphone screenshot. The interactive UI elements on the screenshot are labeled with numeric tags starting from 1. The
numeric tag of each interactive element is located in the center of the element.
You can call the following functions to control the smartphone:
1. tap(element: int)
This function is used to tap an UI element shown on the smartphone screen.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen.
A simple use case can be tap(5), which taps the UI element labeled with the number 5.
2. text(text_input: str)
This function is used to insert text input in an input field/box. text_input is the string you want to insert and must
be wrapped with double quotation marks. A simple use case can be text("Hello, world!"), which inserts the string
"Hello, world!" into the input area on the smartphone screen. This function is usually callable when you see a keyboard
showing in the lower half of the screen.
3. long_press(element: int)
This function is used to long press an UI element shown on the smartphone screen.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen.
A simple use case can be long_press(5), which long presses the UI element labeled with the number 5.
4. swipe(element: int, direction: str, dist: str)
This function is used to swipe an UI element shown on the smartphone screen, usually a scroll view or a slide bar.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen. "direction" is a string that
represents one of the four directions: up, down, left, right. "direction" must be wrapped with double quotation
marks. "dist" determines the distance of the swipe and can be one of the three options: short, medium, long. You should
choose the appropriate distance option according to your need.
A simple use case can be swipe(21, "up", "medium"), which swipes up the UI element labeled with the number 21 for a
medium distance.
5. grid()
You should call this function when you find the element you want to interact with is not labeled with a numeric tag and
other elements with numeric tags cannot help with the task. The function will bring up a grid overlay to divide the
smartphone screen into small areas and this will give you more freedom to choose any part of the screen to tap, long
press, or swipe.
{ui_document}
The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as
follows: {last_act}
Now, given the documentation and the following labeled screenshot, you need to think and call the function needed to
proceed with the task. Your output should include three parts in the given format:
You can only take one action at a time, so please directly call the function."""
screenshot_parse_with_grid_template = """You are an agent that is trained to perform some basic tasks on a smartphone. You will be given
a smartphone screenshot overlaid by a grid. The grid divides the screenshot into small square areas. Each area is
labeled with an integer in the top-left corner.
You can call the following functions to control the smartphone:
1. tap(area: int, subarea: str)
This function is used to tap a grid area shown on the smartphone screen. "area" is the integer label assigned to a grid
area shown on the smartphone screen. "subarea" is a string representing the exact location to tap within the grid area.
It can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left, bottom, and
bottom-right.
A simple use case can be tap(5, "center"), which taps the exact center of the grid area labeled with the number 5.
2. long_press(area: int, subarea: str)
This function is used to long press a grid area shown on the smartphone screen. "area" is the integer label assigned to
a grid area shown on the smartphone screen. "subarea" is a string representing the exact location to long press within
the grid area. It can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left, bottom,
and bottom-right.
A simple use case can be long_press(7, "top-left"), which long presses the top left part of the grid area labeled with
the number 7.
3. swipe(start_area: int, start_subarea: str, end_area: int, end_subarea: str)
This function is used to perform a swipe action on the smartphone screen, especially when you want to interact with a
scroll view or a slide bar. "start_area" is the integer label assigned to the grid area which marks the starting
location of the swipe. "start_subarea" is a string representing the exact location to begin the swipe within the grid
area. "end_area" is the integer label assigned to the grid area which marks the ending location of the swipe.
"end_subarea" is a string representing the exact location to end the swipe within the grid area.
The two subarea parameters can take one of the nine values: center, top-left, top, top-right, left, right, bottom-left,
bottom, and bottom-right.
A simple use case can be swipe(21, "center", 25, "right"), which performs a swipe starting from the center of grid area
21 to the right part of grid area 25.
The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as
follows: {last_act}
Now, given the following labeled screenshot, you need to think and call the function needed to proceed with the task.
Your output should include three parts in the given format:
You can only take one action at a time, so please directly call the function."""
screenshot_parse_self_explore_template = """You are an agent that is trained to complete certain tasks on a smartphone. You will be
given a screenshot of a smartphone app. The interactive UI elements on the screenshot are labeled with numeric tags
starting from 1.
You can call the following functions to interact with those labeled elements to control the smartphone:
1. tap(element: int)
This function is used to tap an UI element shown on the smartphone screen.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen.
A simple use case can be tap(5), which taps the UI element labeled with the number 5.
2. text(text_input: str)
This function is used to insert text input in an input field/box. text_input is the string you want to insert and must
be wrapped with double quotation marks. A simple use case can be text("Hello, world!"), which inserts the string
"Hello, world!" into the input area on the smartphone screen. This function is only callable when you see a keyboard
showing in the lower half of the screen.
3. long_press(element: int)
This function is used to long press an UI element shown on the smartphone screen.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen.
A simple use case can be long_press(5), which long presses the UI element labeled with the number 5.
4. swipe(element: int, direction: str, dist: str)
This function is used to swipe an UI element shown on the smartphone screen, usually a scroll view or a slide bar.
"element" is a numeric tag assigned to an UI element shown on the smartphone screen. "direction" is a string that
represents one of the four directions: up, down, left, right. "direction" must be wrapped with double quotation
marks. "dist" determines the distance of the swipe and can be one of the three options: short, medium, long. You should
choose the appropriate distance option according to your need.
A simple use case can be swipe(21, "up", "medium"), which swipes up the UI element labeled with the number 21 for a
medium distance.
The task you need to complete is to {task_description}. Your past actions to proceed with this task are summarized as
follows: {last_act}
Now, given the following labeled screenshot, you need to think and call the function needed to proceed with the task.
Your output should include three parts in the given format:
You can only take one action at a time, so please directly call the function."""
screenshot_parse_self_explore_reflect_template = """I will give you screenshots of a mobile app before and after {action} the UI
element labeled with the number '{ui_element}' on the first screenshot. The numeric tag of each element is located at
the center of the element. The action of {action} this UI element was described as follows:
{last_act}
The action was also an attempt to proceed with a larger task, which is to {task_desc}. Your job is to carefully analyze
the difference between the two screenshots to determine if the action is in accord with the description above and at
the same time effectively moved the task forward. Your output should be determined based on the following situations:
1. BACK
If you think the action navigated you to a page where you cannot proceed with the given task, you should go back to the
previous interface. At the same time, describe the functionality of the UI element concisely in one or two sentences by
observing the difference between the two screenshots. Notice that your description of the UI element should focus on
the general function. Never include the numeric tag of the UI element in your description. You can use pronouns such as
"the UI element" to refer to the element. Your output should be in the following format:
Decision: BACK
Thought: <explain why you think the last action is wrong and you should go back to the previous interface>
Documentation: <describe the function of the UI element>
2. INEFFECTIVE
If you find the action changed nothing on the screen (screenshots before and after the action are identical), you
should continue to interact with other elements on the screen. Notice that if you find the location of the cursor
changed between the two screenshots, then they are not identical. Your output should be in the following format:
Decision: INEFFECTIVE
Thought: <explain why you made this decision>
Documentation: <None>
3. CONTINUE
If you find the action changed something on the screen but does not reflect the action description above and did not
move the given task forward, you should continue to interact with other elements on the screen. At the same time,
describe the functionality of the UI element concisely in one or two sentences by observing the difference between the
two screenshots. Notice that your description of the UI element should focus on the general function. Never include the
numeric tag of the UI element in your description. You can use pronouns such as "the UI element" to refer to the
element. Your output should be in the following format:
Decision: CONTINUE
Thought: <explain why you think the action does not reflect the action description above and did not move the given
task forward>
Documentation: <describe the function of the UI element>
4. SUCCESS
If you think the action successfully moved the task forward (even though it did not completed the task), you should
describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the UI
element should focus on the general function. Never include the numeric tag of the UI element in your description. You
can use pronouns such as "the UI element" to refer to the element. Your output should be in the following format:
Decision: SUCCESS
Thought: <explain why you think the action successfully moved the task forward>
Documentation: <describe the function of the UI element>
"""

View file

@ -0,0 +1,45 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the prompt templates of phone operation
tap_doc_template = """I will give you the screenshot of a mobile app before and after tapping the UI element labeled
with the number {ui_element} on the screen. The numeric tag of each element is located at the center of the element.
Tapping this UI element is a necessary part of proceeding with a larger task, which is to <task_desc>. Your task is to
describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the UI
element should focus on the general function. For example, if the UI element is used to navigate to the chat window
with John, your description should not include the name of the specific person. Just say: "Tapping this area will
navigate the user to the chat window". Never include the numeric tag of the UI element in your description. You can use
pronouns such as "the UI element" to refer to the element."""
text_doc_template = """I will give you the screenshot of a mobile app before and after typing in the input area labeled
with the number {ui_element} on the screen. The numeric tag of each element is located at the center of the element.
Typing in this UI element is a necessary part of proceeding with a larger task, which is to <task_desc>. Your task is
to describe the functionality of the UI element concisely in one or two sentences. Notice that your description of the
UI element should focus on the general function. For example, if the change of the screenshot shows that the user typed
"How are you?" in the chat box, you do not need to mention the actual text. Just say: "This input area is used for the
user to type a message to send to the chat window.". Never include the numeric tag of the UI element in your
description. You can use pronouns such as "the UI element" to refer to the element."""
long_press_doc_template = """I will give you the screenshot of a mobile app before and after long pressing the UI
element labeled with the number {ui_element} on the screen. The numeric tag of each element is located at the center of
the element. Long pressing this UI element is a necessary part of proceeding with a larger task, which is to
<task_desc>. Your task is to describe the functionality of the UI element concisely in one or two sentences. Notice
that your description of the UI element should focus on the general function. For example, if long pressing the UI
element redirects the user to the chat window with John, your description should not include the name of the specific
person. Just say: "Long pressing this area will redirect the user to the chat window". Never include the numeric tag of
the UI element in your description. You can use pronouns such as "the UI element" to refer to the element."""
swipe_doc_template = """I will give you the screenshot of a mobile app before and after swiping <swipe_dir> the UI
element labeled with the number {ui_element} on the screen. The numeric tag of each element is located at the center of
the element. Swiping this UI element is a necessary part of proceeding with a larger task, which is to <task_desc>.
Your task is to describe the functionality of the UI element concisely in one or two sentences. Notice that your
description of the UI element should be as general as possible. For example, if swiping the UI element increases the
contrast ratio of an image of a building, your description should be just like this: "Swiping this area enables the
user to tune a specific parameter of the image". Never include the numeric tag of the UI element in your description.
You can use pronouns such as "the UI element" to refer to the element."""
refine_doc_suffix = """\nA documentation of this UI element generated from previous demos is shown below. Your
generated description should be based on this previous doc and optimize it. Notice that it is possible that your
understanding of the function of the UI element derived from the given screenshots conflicts with the previous doc,
because the function of a UI element can be flexible. In this case, your generated description should combine both.
Old documentation of this UI element: {old_doc}"""

View file

@ -0,0 +1 @@
pyshine==0.0.9

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,128 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : android assistant to learn from app operations and operate apps
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from pydantic import Field
from examples.andriod_assistant.actions.manual_record import ManualRecord
from examples.andriod_assistant.actions.parse_record import ParseRecord
from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse
from examples.andriod_assistant.actions.self_learn_and_reflect import (
SelfLearnAndReflect,
)
from examples.andriod_assistant.utils.schema import AndroidActionOutput, RunState
from metagpt.actions.add_requirement import UserRequirement
from metagpt.config2 import config
from metagpt.logs import logger
from metagpt.roles.role import Role, RoleReactMode
from metagpt.schema import Message
class AndroidAssistant(Role):
name: str = "Nick"
profile: str = "AndroidAssistant"
goal: str = "operate the mobile phone's apps with self-learn"
task_desc: str = ""
round_count: int = 0
last_act: str = ""
task_dir: Optional[Path] = Field(default=None)
docs_dir: Optional[Path] = Field(default=None)
grid_on: bool = Field(default=False)
def __init__(self, **data):
super().__init__(**data)
self._watch([UserRequirement, AndroidActionOutput])
self.task_desc = config.get_other("task_desc", "Just explore any app in this phone!")
app_name = config.get_other("app_name", "demo")
curr_path = Path(__file__).parent
data_dir = curr_path.joinpath("..", "output")
cur_datetime = datetime.fromtimestamp(int(time.time())).strftime("%Y-%m-%d_%H-%M-%S")
"""Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app,
run the learn first and then do the act stage or learn it during the action.
"""
if config.get_other("stage") == "learn" and config.get_other("mode") == "manual":
# choose ManualRecord and then run ParseRecord
# Remember, only run each action only one time, no need to run n_round.
self.set_actions([ManualRecord, ParseRecord])
self.task_dir = data_dir.joinpath(app_name, f"manual_learn_{cur_datetime}")
self.docs_dir = data_dir.joinpath(app_name, "manual_docs")
elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto":
# choose SelfLearnAndReflect to run
self.set_actions([SelfLearnAndReflect])
self.task_dir = data_dir.joinpath(app_name, f"auto_learn_{cur_datetime}")
self.docs_dir = data_dir.joinpath(app_name, "auto_docs")
elif config.get_other("stage") == "act":
# choose ScreenshotParse to run
self.set_actions([ScreenshotParse])
self.task_dir = data_dir.joinpath(app_name, f"act_{cur_datetime}")
if config.get_other("mode") == "manual":
self.docs_dir = data_dir.joinpath(app_name, "manual_docs")
else:
self.docs_dir = data_dir.joinpath(app_name, "auto_docs")
self._check_dir()
self._set_react_mode(RoleReactMode.BY_ORDER)
def _check_dir(self):
self.task_dir.mkdir(parents=True, exist_ok=True)
self.docs_dir.mkdir(parents=True, exist_ok=True)
async def react(self) -> Message:
self.round_count += 1
result = await super().react()
print(f"react result {result}")
return result
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
if isinstance(todo, ManualRecord):
resp = await todo.run(task_dir=self.task_dir, task_desc=self.task_desc, env=self.rc.env)
elif isinstance(todo, ParseRecord):
resp = await todo.run(
app_name=config.get_other("app_name", "demo"),
task_dir=self.task_dir,
docs_dir=self.docs_dir,
env=self.rc.env,
)
elif isinstance(todo, SelfLearnAndReflect):
resp = await todo.run(
round_count=self.round_count,
task_desc=self.task_desc,
last_act=self.last_act,
task_dir=self.task_dir,
docs_dir=self.docs_dir,
env=self.rc.env,
)
if resp.action_state == RunState.SUCCESS:
self.last_act = resp.data.get("last_act")
elif isinstance(todo, ScreenshotParse):
resp = await todo.run(
round_count=self.round_count,
task_desc=self.task_desc,
last_act=self.last_act,
task_dir=self.task_dir,
docs_dir=self.docs_dir,
grid_on=self.grid_on,
env=self.rc.env,
)
if resp.action_state == RunState.SUCCESS:
logger.info(f"grid_on: {resp.data.get('grid_on')}")
self.grid_on = resp.data.get("grid_on")
msg = Message(
content=f"RoundCount: {self.round_count}",
role=self.profile,
cause_by=type(resp),
send_from=self.name,
send_to=self.name,
)
# self.publish_message(msg)
self.rc.memory.add(msg)
return msg

View file

@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the entry of android assistant including learning and acting stage
import asyncio
from pathlib import Path
import typer
from examples.andriod_assistant.roles.android_assistant import AndroidAssistant
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.team import Team
app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False)
@app.command("", help="Run a Android Assistant")
def startup(
task_desc: str = typer.Argument(help="the task description you want the android assistant to learn or act"),
n_round: int = typer.Option(default=20, help="The max round to do an app operation task."),
stage: str = typer.Option(default="learn", help="stage: learn / act"),
mode: str = typer.Option(default="auto", help="mode: auto / manual , when state=learn"),
app_name: str = typer.Option(default="demo", help="the name of app you want to run"),
investment: float = typer.Option(default=5.0, help="Dollar amount to invest in the AI company."),
refine_doc: bool = typer.Option(
default=False, help="Refine existing operation docs based on the latest observation if True."
),
min_dist: int = typer.Option(
default=30, help="The minimum distance between elements to prevent overlapping during the labeling process."
),
android_screenshot_dir: str = typer.Option(
default="/sdcard/Pictures/Screenshots",
help="The path to store screenshots on android device. Make sure it exists.",
),
android_xml_dir: str = typer.Option(
default="/sdcard",
help="The path to store xml files for determining UI elements localtion. Make sure it exists.",
),
device_id: str = typer.Option(default="emulator-5554", help="The Android device_id"),
):
config.set_other(
{
"stage": stage,
"mode": mode,
"app_name": app_name,
"task_desc": task_desc,
"refine_doc": refine_doc,
"min_dist": min_dist,
"android_screenshot_dir": android_screenshot_dir,
"android_xml_dir": android_xml_dir,
"device_id": device_id,
}
)
team = Team(
env=AndroidEnv(
device_id=device_id,
xml_dir=Path(android_xml_dir),
screenshot_dir=Path(android_screenshot_dir),
)
)
team.hire([AndroidAssistant()])
team.invest(investment)
team.run_project(idea=task_desc)
asyncio.run(team.run(n_round=n_round))
if __name__ == "__main__":
app()

View file

@ -0,0 +1 @@
{'tap': '[CONTENT]\n{\n "Observation": "The first image shows a mobile device\'s home screen with various app icons and a Google search bar at the top. The second image displays an app drawer with a grid of apps and a search bar at the top, indicating that the UI element has been tapped.",\n "Thought": "Tapping the UI element opens the app drawer, which is a common function in mobile operating systems to access a list of all installed apps.",\n "Description": "Tapping this area will open the app drawer, displaying a list of all installed applications."\n}\n[/CONTENT]', 'text': '', 'v_swipe': '', 'h_swipe': '', 'long_press': ''}

View file

@ -0,0 +1,111 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : test case (imgs from appagent's)
import ast
import asyncio
import re
from pathlib import Path
from actions.parse_record_an import RECORD_PARSE_NODE
from prompts.operation_prompt import (
long_press_doc_template,
refine_doc_suffix,
swipe_doc_template,
tap_doc_template,
text_doc_template,
)
from utils.schema import ActionOp, SwipeOp
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.logs import logger
from metagpt.utils.common import encode_image
TEST_BEFORE_PATH = Path("apps/demo_Contacts/labeled_screenshots/demo_Contacts_2024-01-30_21-50-19_1.png")
TEST_AFTER_PATH = Path("apps/demo_Contacts/labeled_screenshots/demo_Contacts_2024-01-30_21-50-19_2.png")
RECORD_PATH = Path("apps/demo_Contacts/record.txt")
TASK_DESC_PATH = Path("apps/demo_Contacts/task_desc.txt")
DOCS_DIR = Path("storage")
testaction = Action(name="test")
# TODO test for parse record
# 仅使用一张图像进行测试
async def manual_test():
img_before_base64 = encode_image(TEST_BEFORE_PATH)
img_after_base64 = encode_image(TEST_AFTER_PATH)
with open(RECORD_PATH, "r") as record_file:
rec = record_file.readline().strip()
action, resource_id = rec.split(":::")
action_type = action.split("(")[0]
action_param = re.findall(r"\((.*?)\)", action)[0]
if action_type == ActionOp.TAP.value:
prompt_template = tap_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.TEXT.value:
input_area, input_text = action_param.split(":sep:")
prompt_template = text_doc_template
context = prompt_template.format(ui_element=input_area)
elif action_type == ActionOp.LONG_PRESS.value:
prompt_template = long_press_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.SWIPE.value:
swipe_area, swipe_dir = action_param.split(":sep:")
if swipe_dir == SwipeOp.UP.value or swipe_dir == SwipeOp.DOWN.value:
action_type = ActionOp.VERTICAL_SWIPE.value
elif swipe_dir == SwipeOp.LEFT.value or swipe_dir == SwipeOp.RIGHT.value:
action_type = ActionOp.HORIZONTAL_SWIPE.value
prompt_template = swipe_doc_template
context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area)
else:
print("Error occurs")
task_desc_path = TASK_DESC_PATH
task_desc = open(task_desc_path, "r").read()
context = context.format(task_desc=task_desc)
doc_name = resource_id + ".txt"
doc_path = DOCS_DIR.joinpath(doc_name)
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[action_type]:
if config.get_other("doc_refine"):
refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type])
context += refine_context
logger.info(
f"Documentation for the element {resource_id} already exists. The doc will be "
f"refined based on the latest demo."
)
else:
logger.info(
f"Documentation for the element {resource_id} already exists. Turn on DOC_REFINE "
f"in the config file if needed."
)
else:
doc_content = {"tap": "", "text": "", "v_swipe": "", "h_swipe": "", "long_press": ""}
logger.info(f"Waiting for GPT-4V to generate documentation for the element {resource_id}")
node = await RECORD_PARSE_NODE.fill(
context=context, llm=testaction.llm, images=[img_before_base64, img_after_base64]
)
# log_path = task_dir.joinpath(f"log_{app_name}_{demo_name}.txt")
node.compile(context=context, schema="json", mode="auto")
msg = node.content
doc_content[action_type] = msg
with open(doc_path, "w") as outfile:
outfile.write(str(doc_content))
logger.info(f"Documentation generated and saved to {doc_path}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(manual_test())
loop.close()
print("OK")

View file

@ -0,0 +1,80 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : test on android emulator action. After Modify Role Test, this script is discarded.
import asyncio
import time
from pathlib import Path
from actions.manual_record import ManualRecord
from actions.parse_record import ParseRecord
from actions.screenshot_parse import ScreenshotParse
from actions.self_learn_and_reflect import SelfLearnAndReflect
from metagpt.environment.android_env.android_env import AndroidEnv
TASK_PATH = Path("apps/Contacts")
DEMO_NAME = str(time.time())
SELF_EXPLORE_DOC_PATH = TASK_PATH.joinpath("autodocs")
PARSE_RECORD_DOC_PATH = TASK_PATH.joinpath("demodocs")
test_env_self_learn_android = AndroidEnv(
device_id="emulator-5554",
xml_dir=Path("/sdcard"),
screenshot_dir=Path("/sdcard/Pictures/Screenshots"),
)
test_self_learning = SelfLearnAndReflect()
test_env_manual_learn_android = AndroidEnv(
device_id="emulator-5554",
xml_dir=Path("/sdcard"),
screenshot_dir=Path("/sdcard/Pictures/Screenshots"),
)
test_manual_record = ManualRecord()
test_manual_parse = ParseRecord()
test_env_screenshot_parse_android = AndroidEnv(
device_id="emulator-5554",
xml_dir=Path("/sdcard"),
screenshot_dir=Path("/sdcard/Pictures/Screenshots"),
)
test_screenshot_parse = ScreenshotParse()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
test_action_list = [
test_self_learning.run(
round_count=20,
task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
last_act="",
task_dir=TASK_PATH / "demos" / f"self_learning_{DEMO_NAME}",
docs_dir=SELF_EXPLORE_DOC_PATH,
env=test_env_self_learn_android,
),
test_manual_record.run(
# demo_name=DEMO_NAME,
task_dir=TASK_PATH / "demos" / f"manual_record_{DEMO_NAME}",
task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
env=test_env_manual_learn_android,
),
test_manual_parse.run(
app_name="Contacts",
# demo_name=DEMO_NAME,
task_dir=TASK_PATH / "demos" / f"manual_record_{DEMO_NAME}", # 修要修改
docs_dir=PARSE_RECORD_DOC_PATH, # 需要修改
env=test_env_manual_learn_android,
),
test_screenshot_parse.run(
round_count=20,
task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
last_act="",
task_dir=TASK_PATH / f"act_{DEMO_NAME}",
docs_dir=PARSE_RECORD_DOC_PATH,
env=test_env_screenshot_parse_android,
grid_on=False,
),
]
loop.run_until_complete(asyncio.gather(*test_action_list))
loop.close()
print("Finish")

View file

@ -0,0 +1,158 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
from enum import Enum
from pydantic import BaseModel, Field, field_validator
class ActionOp(Enum):
TAP = "tap"
LONG_PRESS = "long_press"
TEXT = "text"
SWIPE = "swipe"
VERTICAL_SWIPE = "v_swipe"
HORIZONTAL_SWIPE = "h_swipe"
GRID = "grid"
STOP = "stop"
class SwipeOp(Enum):
UP = "up"
DOWN = "down"
LEFT = "left"
RIGHT = "right"
class Decision(Enum):
BACK = "BACK"
INEFFECTIVE = "INEFFECTIVE"
CONTINUE = "CONTINUE"
SUCCESS = "SUCCESS"
@classmethod
def values(cls):
return [item.value for item in cls]
class AndroidElement(BaseModel):
"""UI Element"""
uid: str = Field(default="")
bbox: tuple[tuple[int, int], tuple[int, int]] = Field(default={})
attrib: str = Field(default="")
class OpLogItem(BaseModel):
"""log content for self-learn or task act"""
step: int = Field(default=0)
prompt: str = Field(default="")
image: str = Field(default="")
response: str = Field(default="")
class ReflectLogItem(BaseModel):
"""log content for self-learn-reflect"""
step: int = Field(default=0)
prompt: str = Field(default="")
image_before: str = Field(default="")
image_after: str = Field(default="")
response: str = Field(default="")
class RecordLogItem(BaseModel):
"""log content for record parse, same as ReflectLogItem"""
step: int = Field(default=0)
prompt: str = Field(default="")
image_before: str = Field(default="")
image_after: str = Field(default="")
response: str = Field(default="")
class DocContent(BaseModel):
tap: str = Field(default="")
text: str = Field(default="")
v_swipe: str = Field(default="")
h_swipe: str = Field(default="")
long_press: str = Field(default="")
# start =================== define different Action Op and its params =============
class RunState(Enum):
"""run state"""
SUCCESS = "success"
FINISH = "finish"
FAIL = "fail"
class BaseOpParam(BaseModel):
act_name: str = Field(default="", validate_default=True)
last_act: str = Field(default="")
param_state: RunState = Field(default=RunState.SUCCESS, description="return state when extract params")
class TapOp(BaseOpParam):
area: int = Field(default=-1)
class TextOp(BaseOpParam):
input_str: str = Field(default="")
class LongPressOp(BaseOpParam):
area: int = Field(default=-1)
# Modify This SwipeOp to SwipeOp_3, Need better name
class SwipeOp_3(BaseOpParam):
area: int = Field(default=-1)
swipe_orient: str = Field(default="up")
dist: str = Field(default="")
class GridOp(BaseModel):
act_name: str = Field(default="")
class BaseGridOpParam(BaseOpParam):
@field_validator("act_name", mode="before")
@classmethod
def check_act_name(cls, act_name: str) -> str:
return f"{act_name}_grid"
class TapGridOp(BaseGridOpParam):
area: int = Field(default=-1)
subarea: str = Field(default="")
class LongPressGridOp(BaseGridOpParam):
area: int = Field(default=-1)
subarea: str = Field(default="")
class SwipeGridOp(BaseGridOpParam):
start_area: int = Field(default=-1)
start_subarea: str = Field(default="")
end_area: int = Field(default=-1)
end_subarea: str = Field(default="")
# end =================== define different Action Op and its params =============
class ReflectOp(BaseModel):
decision: str = ""
thought: str = ""
documentation: str = ""
param_state: RunState = RunState.SUCCESS
class AndroidActionOutput(BaseModel):
data: dict = Field(default=dict())
action_state: RunState = Field(default=RunState.SUCCESS)

View file

@ -0,0 +1,300 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :
import re
from pathlib import Path
from typing import Union
from xml.etree.ElementTree import Element, iterparse
import cv2
import pyshine as ps
from examples.andriod_assistant.utils.schema import (
ActionOp,
AndroidElement,
BaseGridOpParam,
BaseOpParam,
Decision,
GridOp,
LongPressGridOp,
LongPressOp,
ReflectOp,
RunState,
SwipeGridOp,
SwipeOp_3,
TapGridOp,
TapOp,
TextOp,
)
from metagpt.logs import logger
def get_id_from_element(elem: Element) -> str:
bounds = elem.attrib["bounds"][1:-1].split("][")
x1, y1 = map(int, bounds[0].split(","))
x2, y2 = map(int, bounds[1].split(","))
elem_w, elem_h = x2 - x1, y2 - y1
if "resource-id" in elem.attrib and elem.attrib["resource-id"]:
elem_id = elem.attrib["resource-id"].replace(":", ".").replace("/", "_")
else:
elem_id = f"{elem.attrib['class']}_{elem_w}_{elem_h}"
if "content-desc" in elem.attrib and elem.attrib["content-desc"] and len(elem.attrib["content-desc"]) < 20:
content_desc = elem.attrib["content-desc"].replace("/", "_").replace(" ", "").replace(":", "_")
elem_id += f"_{content_desc}"
return elem_id
def traverse_xml_tree(xml_path: Path, elem_list: list[AndroidElement], attrib: str, add_index=False):
path = []
for event, elem in iterparse(str(xml_path), ["start", "end"]):
if event == "start":
path.append(elem)
if attrib in elem.attrib and elem.attrib[attrib] == "true":
parent_prefix = ""
if len(path) > 1:
parent_prefix = get_id_from_element(path[-2])
bounds = elem.attrib["bounds"][1:-1].split("][")
x1, y1 = map(int, bounds[0].split(","))
x2, y2 = map(int, bounds[1].split(","))
center = (x1 + x2) // 2, (y1 + y2) // 2
elem_id = get_id_from_element(elem)
if parent_prefix:
elem_id = parent_prefix + "_" + elem_id
if add_index:
elem_id += f"_{elem.attrib['index']}"
close = False
for e in elem_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
# TODO Modify config to default 30. It should be modified back config after single action test
# if dist <= config.get_other("min_dist"):
if dist <= 30:
close = True
break
if not close:
elem_list.append(AndroidElement(uid=elem_id, bbox=((x1, y1), (x2, y2)), attrib=attrib))
if event == "end":
path.pop()
def draw_bbox_multi(
img_path: Path,
output_path: Path,
elem_list: list[AndroidElement],
record_mode: bool = False,
dark_mode: bool = False,
):
imgcv = cv2.imread(str(img_path))
count = 1
for elem in elem_list:
try:
top_left = elem.bbox[0]
bottom_right = elem.bbox[1]
left, top = top_left[0], top_left[1]
right, bottom = bottom_right[0], bottom_right[1]
label = str(count)
if record_mode:
if elem.attrib == "clickable":
color = (250, 0, 0)
elif elem.attrib == "focusable":
color = (0, 0, 250)
else:
color = (0, 250, 0)
imgcv = ps.putBText(
imgcv,
label,
text_offset_x=(left + right) // 2 + 10,
text_offset_y=(top + bottom) // 2 + 10,
vspace=10,
hspace=10,
font_scale=1,
thickness=2,
background_RGB=color,
text_RGB=(255, 250, 250),
alpha=0.5,
)
else:
text_color = (10, 10, 10) if dark_mode else (255, 250, 250)
bg_color = (255, 250, 250) if dark_mode else (10, 10, 10)
imgcv = ps.putBText(
imgcv,
label,
text_offset_x=(left + right) // 2 + 10,
text_offset_y=(top + bottom) // 2 + 10,
vspace=10,
hspace=10,
font_scale=1,
thickness=2,
background_RGB=bg_color,
text_RGB=text_color,
alpha=0.5,
)
except Exception as e:
logger.error(f"ERROR: An exception occurs while labeling the image\n{e}")
count += 1
cv2.imwrite(str(output_path), imgcv)
return imgcv
def draw_grid(img_path: Path, output_path: Path) -> tuple[int, int]:
def get_unit_len(n):
for i in range(1, n + 1):
if n % i == 0 and 120 <= i <= 180:
return i
return -1
image = cv2.imread(str(img_path))
height, width, _ = image.shape
color = (255, 116, 113)
unit_height = get_unit_len(height)
if unit_height < 0:
unit_height = 120
unit_width = get_unit_len(width)
if unit_width < 0:
unit_width = 120
thick = int(unit_width // 50)
rows = height // unit_height
cols = width // unit_width
for i in range(rows):
for j in range(cols):
label = i * cols + j + 1
left = int(j * unit_width)
top = int(i * unit_height)
right = int((j + 1) * unit_width)
bottom = int((i + 1) * unit_height)
cv2.rectangle(image, (left, top), (right, bottom), color, thick // 2)
cv2.putText(
image,
str(label),
(left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3),
0,
int(0.01 * unit_width),
(0, 0, 0),
thick,
)
cv2.putText(
image,
str(label),
(left + int(unit_width * 0.05), top + int(unit_height * 0.3)),
0,
int(0.01 * unit_width),
color,
thick,
)
cv2.imwrite(str(output_path), image)
return rows, cols
def area_to_xy(area: int, subarea: str, width: int, height: int, rows: int, cols: int) -> tuple[int, int]:
area -= 1
logger.info(f"{cols}")
row, col = area // cols, area % cols
x_0, y_0 = col * (width // cols), row * (height // rows)
if subarea == "top-left":
x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) // 4
elif subarea == "top":
x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) // 4
elif subarea == "top-right":
x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) // 4
elif subarea == "left":
x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) // 2
elif subarea == "right":
x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) // 2
elif subarea == "bottom-left":
x, y = x_0 + (width // cols) // 4, y_0 + (height // rows) * 3 // 4
elif subarea == "bottom":
x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) * 3 // 4
elif subarea == "bottom-right":
x, y = x_0 + (width // cols) * 3 // 4, y_0 + (height // rows) * 3 // 4
else:
x, y = x_0 + (width // cols) // 2, y_0 + (height // rows) // 2
return x, y
def elem_bbox_to_xy(bbox: tuple[tuple[int, int], tuple[int, int]]) -> tuple[int, int]:
tl, br = bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
return x, y
def reflect_parse_extarct(parsed_json: dict) -> ReflectOp:
decision = parsed_json.get("Decision")
if decision not in Decision.values():
op = ReflectOp(param_state=RunState.FAIL)
else:
op = ReflectOp(
decision=parsed_json.get("Decision"),
thought=parsed_json.get("Thought"),
documentation=parsed_json.get("Documentation"),
)
return op
def screenshot_parse_extract(parsed_json: dict, grid_on: bool = False) -> Union[BaseOpParam, BaseGridOpParam, GridOp]:
act = parsed_json.get("Action")
last_act = parsed_json.get("Summary")
act_name = act.split("(")[0]
if RunState.FINISH.value.upper() in act:
return BaseOpParam(param_state=RunState.FINISH)
if grid_on:
return screenshot_parse_extract_with_grid(act_name, act, last_act)
else:
return screenshot_parse_extract_without_grid(act_name, act, last_act)
def op_params_clean(params: list[str]) -> list[Union[int, str]]:
param_values = []
for param_value in params:
if '"' in param_value or "'" in param_value: # remove `"`
param_values.append(param_value.strip()[1:-1])
else:
param_values.append(int(param_value))
return param_values
def screenshot_parse_extract_without_grid(act_name: str, act: str, last_act: str) -> Union[BaseOpParam, GridOp]:
if act_name == ActionOp.TAP.value:
area = int(re.findall(r"tap\((.*?)\)", act)[0])
op = TapOp(act_name=act_name, area=area, last_act=last_act)
elif act_name == ActionOp.TEXT.value:
input_str = re.findall(r"text\((.*?)\)", act)[0][1:-1]
op = TextOp(act_name=act_name, input_str=input_str, last_act=last_act)
elif act_name == ActionOp.LONG_PRESS.value:
area = int(re.findall(r"long_press\((.*?)\)", act)[0])
op = LongPressOp(act_name=act_name, area=area, last_act=last_act)
elif act_name == ActionOp.SWIPE.value:
params = re.findall(r"swipe\((.*?)\)", act)[0].split(",")
params = op_params_clean(params) # area, swipe_orient, dist
op = SwipeOp_3(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act)
elif act_name == ActionOp.GRID.value:
op = GridOp(act_name=act_name)
else:
op = BaseOpParam(param_state=RunState.FAIL)
return op
def screenshot_parse_extract_with_grid(act_name: str, act: str, last_act: str) -> Union[BaseGridOpParam, GridOp]:
if act_name == ActionOp.TAP.value:
params = re.findall(r"tap\((.*?)\)", act)[0].split(",")
params = op_params_clean(params)
op = TapGridOp(act_name=act_name, area=params[0], subarea=params[1], last_act=last_act)
elif act_name == ActionOp.LONG_PRESS.value:
params = re.findall(r"long_press\((.*?)\)", act)[0].split(",")
params = op_params_clean(params)
op = LongPressGridOp(act_name=act_name, area=params[0], subarea=params[1], last_act=last_act)
elif act_name == ActionOp.SWIPE.value:
params = re.findall(r"swipe\((.*?)\)", act)[0].split(",")
params = op_params_clean(params)
op = SwipeGridOp(
act_name=act_name, start_area=params[0], start_subarea=params[1], end_area=params[2], end_subarea=params[3]
)
elif act_name == ActionOp.GRID.value:
op = GridOp(act_name=act_name)
else:
op = BaseGridOpParam(param_state=RunState.FAIL)
return op

View file

@ -6,9 +6,11 @@
@File : llm_hello_world.py
"""
import asyncio
from pathlib import Path
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.utils.common import encode_image
async def main():
@ -38,6 +40,15 @@ async def main():
if hasattr(llm, "completion"):
logger.info(llm.completion(hello_msg))
# check llm-vision capacity if it supports
invoice_path = Path(__file__).parent.joinpath("..", "tests", "data", "invoices", "invoice-2.png")
img_base64 = encode_image(invoice_path)
try:
res = await llm.aask(msg="if this is a invoice, just return True else return False", images=[img_base64])
assert "true" in res.lower()
except Exception:
pass
if __name__ == "__main__":
asyncio.run(main())

View file

@ -453,7 +453,6 @@ class ActionNode:
self, schema, mode, images: Optional[Union[str, list[str]]] = None, timeout=USE_CONFIG_TIMEOUT, exclude=None
):
prompt = self.compile(context=self.context, schema=schema, mode=mode, exclude=exclude)
if schema != "raw":
mapping = self.get_mapping(mode, exclude=exclude)
class_name = f"{self.key}_AN"
@ -464,6 +463,7 @@ class ActionNode:
self.instruct_content = scontent
else:
self.content = await self.llm.aask(prompt)
logger.info(self.content)
self.instruct_content = None
return self

View file

@ -75,6 +75,7 @@ class Config(CLIParams, YamlModel):
iflytek_api_key: str = ""
azure_tts_subscription_key: str = ""
azure_tts_region: str = ""
other: dict = dict() # other dict
@classmethod
def from_home(cls, path):
@ -127,6 +128,15 @@ class Config(CLIParams, YamlModel):
self.reqa_file = reqa_file
self.max_auto_summarize_code = max_auto_summarize_code
def set_other(self, other: dict):
self.other = other
def get_other(self, key: str, default_value: str = None):
if default_value is None:
return self.other.get(key)
else:
return self.other.get(key, default_value)
def get_openai_llm(self) -> Optional[LLMConfig]:
"""Get OpenAI LLMConfig by name. If no OpenAI, raise Exception"""
if self.llm.api_type == LLMType.OPENAI:

View file

@ -101,6 +101,7 @@ class AndroidExtEnv(ExtEnv):
return f"adb -s {self.device_id} "
def execute_adb_with_cmd(self, adb_cmd: str) -> str:
adb_cmd = adb_cmd.replace("\\", "/")
res = subprocess.run(adb_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
exec_res = ADB_EXEC_FAIL
if not res.returncode:

View file

@ -18,11 +18,11 @@ class EnvAPIAbstract(BaseModel):
class EnvAPIRegistry(BaseModel):
"""the registry to store environment w&r api/interface"""
registry: dict[str, dict[str, Union[dict, Any, str]]] = Field(default=dict(), exclude=True)
registry: dict[str, Callable] = Field(default=dict(), exclude=True)
def get(self, api_name: str):
if api_name not in self.registry:
raise ValueError
raise KeyError(f"api_name: {api_name} not found")
return self.registry.get(api_name)
def __getitem__(self, api_name: str) -> Callable:

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the readable api/interface abstraction to integrate with environment

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the writable api/interface abstraction to integrate with environment

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : MG Gym Env
class GymEnv:
pass

View file

@ -76,6 +76,7 @@ class Team(BaseModel):
def hire(self, roles: list[Role]):
"""Hire roles to cooperate"""
roles[0]
self.env.add_roles(roles)
@property

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :

View file

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc :