mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-05-15 11:02:36 +02:00
FIx Format and Some bugs in android_assistant.py
This commit is contained in:
parent
0f0f41fb30
commit
732cf36fbc
13 changed files with 223 additions and 143 deletions
|
|
@ -9,9 +9,8 @@ import cv2
|
|||
from examples.andriod_assistant.utils.schema import (
|
||||
ActionOp,
|
||||
AndroidActionOutput,
|
||||
AndroidElement,
|
||||
RunState,
|
||||
SwipeOp
|
||||
SwipeOp,
|
||||
)
|
||||
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree
|
||||
from metagpt.actions.action import Action
|
||||
|
|
@ -24,6 +23,7 @@ from metagpt.logs import logger
|
|||
|
||||
class ManualRecord(Action):
|
||||
"""do a human operation on the screen with human input"""
|
||||
|
||||
name: str = "ManualRecord"
|
||||
|
||||
useless_list: list[str] = [] # store useless elements uid
|
||||
|
|
@ -35,19 +35,18 @@ class ManualRecord(Action):
|
|||
|
||||
# async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv):
|
||||
async def run(self, task_desc: str, task_dir: Path, env: AndroidEnv):
|
||||
|
||||
self.record_path = Path(task_dir) / "record.txt"
|
||||
self.task_desc_path = Path(task_dir) / "task_desc.txt"
|
||||
self.screenshot_before_path = Path(task_dir)/"raw_screenshots"
|
||||
self.screenshot_after_path = Path(task_dir)/"labeled_screenshots"
|
||||
self.xml_path = Path(task_dir)/"xml"
|
||||
self.screenshot_before_path = Path(task_dir) / "raw_screenshots"
|
||||
self.screenshot_after_path = Path(task_dir) / "labeled_screenshots"
|
||||
self.xml_path = Path(task_dir) / "xml"
|
||||
|
||||
for path in [self.screenshot_before_path,self.screenshot_after_path, self.xml_path]:
|
||||
for path in [self.screenshot_before_path, self.screenshot_after_path, self.xml_path]:
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(self.record_path, 'w') as file:
|
||||
file.write('')
|
||||
with open(self.record_path, "w") as file:
|
||||
file.write("")
|
||||
record_file = open(self.record_path, "w")
|
||||
with open(self.task_desc_path, "w") as f:
|
||||
f.write(task_desc)
|
||||
|
|
@ -58,14 +57,14 @@ class ManualRecord(Action):
|
|||
EnvAPIAbstract(
|
||||
api_name="get_screenshot",
|
||||
# kwargs={"ss_name": f"{demo_name}_{step}", "local_save_dir": self.screenshot_before_path}
|
||||
kwargs={"ss_name": f"{step}", "local_save_dir": self.screenshot_before_path}
|
||||
kwargs={"ss_name": f"{step}", "local_save_dir": self.screenshot_before_path},
|
||||
)
|
||||
)
|
||||
xml_path: Path = await env.observe(
|
||||
EnvAPIAbstract(
|
||||
api_name="get_xml",
|
||||
# kwargs={"xml_name": f"{demo_name}_{step}", "local_save_dir": self.xml_path}
|
||||
kwargs={"xml_name": f"{step}", "local_save_dir": self.xml_path}
|
||||
kwargs={"xml_name": f"{step}", "local_save_dir": self.xml_path},
|
||||
)
|
||||
)
|
||||
if not screenshot_path.exists() or not xml_path.exists():
|
||||
|
|
@ -110,11 +109,11 @@ class ManualRecord(Action):
|
|||
)
|
||||
|
||||
while (
|
||||
user_input.lower() != ActionOp.TAP.value
|
||||
and user_input.lower() != ActionOp.TEXT.value
|
||||
and user_input.lower() != ActionOp.LONG_PRESS.value
|
||||
and user_input.lower() != ActionOp.SWIPE.value
|
||||
and user_input.lower() != ActionOp.STOP.value
|
||||
user_input.lower() != ActionOp.TAP.value
|
||||
and user_input.lower() != ActionOp.TEXT.value
|
||||
and user_input.lower() != ActionOp.LONG_PRESS.value
|
||||
and user_input.lower() != ActionOp.SWIPE.value
|
||||
and user_input.lower() != ActionOp.STOP.value
|
||||
):
|
||||
user_input = input()
|
||||
|
||||
|
|
@ -167,10 +166,10 @@ class ManualRecord(Action):
|
|||
)
|
||||
user_input = ""
|
||||
while (
|
||||
user_input != SwipeOp.UP.value
|
||||
and user_input != SwipeOp.DOWN.value
|
||||
and user_input != SwipeOp.LEFT.value
|
||||
and user_input != SwipeOp.RIGHT.value
|
||||
user_input != SwipeOp.UP.value
|
||||
and user_input != SwipeOp.DOWN.value
|
||||
and user_input != SwipeOp.LEFT.value
|
||||
and user_input != SwipeOp.RIGHT.value
|
||||
):
|
||||
user_input = input()
|
||||
swipe_dir = user_input
|
||||
|
|
@ -179,7 +178,9 @@ class ManualRecord(Action):
|
|||
user_input = input()
|
||||
tl, br = elem_list[int(user_input) - 1].bbox
|
||||
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
|
||||
ret = await env.step(EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir}))
|
||||
ret = await env.step(
|
||||
EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir})
|
||||
)
|
||||
if ret == ADB_EXEC_FAIL:
|
||||
return AndroidActionOutput(action_state=RunState.FAIL)
|
||||
record_file.write(f"swipe({int(user_input)}:sep:{swipe_dir}):::{elem_list[int(user_input) - 1].uid}\n")
|
||||
|
|
@ -190,5 +191,3 @@ class ManualRecord(Action):
|
|||
else:
|
||||
break
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@
|
|||
import ast
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from examples.andriod_assistant.actions.parse_record_an import RECORD_PARSE_NODE
|
||||
|
|
@ -44,8 +43,8 @@ class ParseRecord(Action):
|
|||
doc_count = 0
|
||||
self.record_path = Path(task_dir) / "record.txt"
|
||||
self.task_desc_path = Path(task_dir) / "task_desc.txt"
|
||||
self.screenshot_before_path = Path(task_dir)/"raw_screenshots"
|
||||
self.screenshot_after_path = Path(task_dir)/"labeled_screenshots"
|
||||
self.screenshot_before_path = Path(task_dir) / "raw_screenshots"
|
||||
self.screenshot_after_path = Path(task_dir) / "labeled_screenshots"
|
||||
|
||||
with open(self.record_path, "r") as record_file:
|
||||
record_step_count = len(record_file.readlines()) - 1
|
||||
|
|
@ -137,5 +136,6 @@ class ParseRecord(Action):
|
|||
|
||||
logger.info(f"Documentation generation phase completed. {doc_count} docs generated.")
|
||||
|
||||
|
||||
# TODO
|
||||
# 1. LOG中记录方式有问题,需要把IMG的部分拿出去丢掉
|
||||
# 1. LOG中记录方式有问题,需要把IMG的部分拿出去丢掉
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ from examples.andriod_assistant.utils.schema import (
|
|||
)
|
||||
from examples.andriod_assistant.utils.utils import (
|
||||
area_to_xy,
|
||||
draw_grid,
|
||||
draw_bbox_multi,
|
||||
draw_grid,
|
||||
elem_bbox_to_xy,
|
||||
screenshot_parse_extract,
|
||||
traverse_xml_tree,
|
||||
|
|
@ -79,14 +79,14 @@ class ScreenshotParse(Action):
|
|||
return ui_doc
|
||||
|
||||
async def run(
|
||||
self,
|
||||
round_count: int,
|
||||
task_desc: str,
|
||||
last_act: str,
|
||||
task_dir: Path,
|
||||
docs_dir: Path,
|
||||
grid_on: bool,
|
||||
env: AndroidEnv,
|
||||
self,
|
||||
round_count: int,
|
||||
task_desc: str,
|
||||
last_act: str,
|
||||
task_dir: Path,
|
||||
docs_dir: Path,
|
||||
grid_on: bool,
|
||||
env: AndroidEnv,
|
||||
):
|
||||
for path in [task_dir, docs_dir]:
|
||||
if not path.exists():
|
||||
|
|
@ -94,15 +94,11 @@ class ScreenshotParse(Action):
|
|||
|
||||
screenshot_path: Path = await env.observe(
|
||||
EnvAPIAbstract(
|
||||
api_name="get_screenshot",
|
||||
kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
|
||||
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
|
||||
)
|
||||
)
|
||||
xml_path: Path = await env.observe(
|
||||
EnvAPIAbstract(
|
||||
api_name="get_xml",
|
||||
kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir}
|
||||
)
|
||||
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
|
||||
)
|
||||
width, height = env.device_shape
|
||||
if not screenshot_path.exists() or not xml_path.exists():
|
||||
|
|
@ -134,7 +130,7 @@ class ScreenshotParse(Action):
|
|||
parse_template = screenshot_parse_with_grid_template if grid_on else screenshot_parse_template
|
||||
|
||||
if grid_on:
|
||||
rows, cols = draw_grid(screenshot_path, task_dir / f"{round_count}_grid.png")
|
||||
env.rows, env.cols = draw_grid(screenshot_path, task_dir / f"{round_count}_grid.png")
|
||||
|
||||
ui_doc = self._makeup_ui_document(elem_list, docs_dir)
|
||||
context = parse_template.format(ui_document=ui_doc, task_description=task_desc, last_act=last_act)
|
||||
|
|
@ -171,7 +167,7 @@ class ScreenshotParse(Action):
|
|||
res = await env.step(
|
||||
EnvAPIAbstract(
|
||||
api_name="user_swipe",
|
||||
kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}
|
||||
kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist},
|
||||
)
|
||||
)
|
||||
if res == ADB_EXEC_FAIL:
|
||||
|
|
@ -190,10 +186,15 @@ class ScreenshotParse(Action):
|
|||
if res == ADB_EXEC_FAIL:
|
||||
return AndroidActionOutput(action_state=RunState.FAIL)
|
||||
elif isinstance(op_param, SwipeGridOp):
|
||||
start_x, start_y = area_to_xy(op_param.start_area, op_param.start_subarea, env.width, env.height, env.rows, env.cols)
|
||||
end_x, end_y = area_to_xy(op_param.end_area, op_param.end_subarea, env.width, env.height, env.rows, env.cols)
|
||||
start_x, start_y = area_to_xy(
|
||||
op_param.start_area, op_param.start_subarea, env.width, env.height, env.rows, env.cols
|
||||
)
|
||||
end_x, end_y = area_to_xy(
|
||||
op_param.end_area, op_param.end_subarea, env.width, env.height, env.rows, env.cols
|
||||
)
|
||||
res = await env.step(
|
||||
EnvAPIAbstract(api_name="user_swipe_to", kwargs={"start": (start_x, start_y), "end": (end_x, end_y)}))
|
||||
EnvAPIAbstract(api_name="user_swipe_to", kwargs={"start": (start_x, start_y), "end": (end_x, end_y)})
|
||||
)
|
||||
if res == ADB_EXEC_FAIL:
|
||||
return AndroidActionOutput(action_state=RunState.FAIL)
|
||||
|
||||
|
|
|
|||
|
|
@ -59,17 +59,17 @@ class SelfLearnAndReflect(Action):
|
|||
ui_area: int = -1
|
||||
|
||||
async def run(
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
|
||||
) -> AndroidActionOutput:
|
||||
for path in [task_dir,docs_dir]:
|
||||
for path in [task_dir, docs_dir]:
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True,exist_ok=True)
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
|
||||
resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
|
||||
return resp
|
||||
|
||||
async def run_self_learn(
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv
|
||||
) -> AndroidActionOutput:
|
||||
screenshot_path: Path = await env.observe(
|
||||
EnvAPIAbstract(
|
||||
|
|
@ -151,7 +151,8 @@ class SelfLearnAndReflect(Action):
|
|||
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
|
||||
res = await env.step(
|
||||
EnvAPIAbstract(
|
||||
api_name="user_swipe", kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}
|
||||
api_name="user_swipe",
|
||||
kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist},
|
||||
)
|
||||
)
|
||||
if res == ADB_EXEC_FAIL:
|
||||
|
|
@ -159,11 +160,10 @@ class SelfLearnAndReflect(Action):
|
|||
|
||||
self.elem_list = elem_list
|
||||
self.act_name = op_param.act_name
|
||||
print("探索阶段结束")
|
||||
return AndroidActionOutput()
|
||||
|
||||
async def run_reflect(
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
|
||||
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
|
||||
) -> AndroidActionOutput:
|
||||
screenshot_path: Path = await env.observe(
|
||||
EnvAPIAbstract(
|
||||
|
|
@ -176,7 +176,6 @@ class SelfLearnAndReflect(Action):
|
|||
screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png")
|
||||
draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list)
|
||||
img_base64 = encode_image(screenshot_after_labeled_path)
|
||||
|
||||
if self.act_name == ActionOp.TAP.value:
|
||||
action = "tapping"
|
||||
elif self.act_name == ActionOp.LONG_PRESS.value:
|
||||
|
|
@ -187,6 +186,11 @@ class SelfLearnAndReflect(Action):
|
|||
action = "v_swipe"
|
||||
elif self.swipe_orient == SwipeOp.LEFT.value or self.swipe_orient == SwipeOp.RIGHT.value:
|
||||
action = "h_swipe"
|
||||
else:
|
||||
# TODO Test for assignment, This error is eupiped with the next.
|
||||
logger.info(f"Warning: current action name:{self.act_name}")
|
||||
logger.info("Warning: act_name parse wrong!")
|
||||
action = None
|
||||
context = reflect_template.format(
|
||||
action=action, ui_element=str(self.ui_area), task_desc=task_desc, last_act=last_act
|
||||
)
|
||||
|
|
@ -211,7 +215,8 @@ class SelfLearnAndReflect(Action):
|
|||
return AndroidActionOutput(action_state=RunState.FINISH)
|
||||
if op_param.param_state == RunState.FAIL:
|
||||
return AndroidActionOutput(action_state=RunState.FAIL)
|
||||
|
||||
# TODO 这里经常出现错误
|
||||
logger.info(f"Error 高发地区, 长度为{len(self.elem_list)},ui_erea为{self.ui_area}")
|
||||
resource_id = self.elem_list[int(self.ui_area) - 1].uid
|
||||
if op_param.decision == Decision.INEFFECTIVE.value:
|
||||
self.useless_list.append(resource_id)
|
||||
|
|
@ -235,8 +240,7 @@ class SelfLearnAndReflect(Action):
|
|||
doc_content = DocContent()
|
||||
setattr(doc_content, self.act_name, doc)
|
||||
doc_path.write_text(str(doc_content))
|
||||
print("反思阶段结束")
|
||||
return AndroidActionOutput(data={"last_act": last_act})
|
||||
|
||||
# TODO 如何处理 FINISH 状态,这一点应该需要与role 联动才能解决
|
||||
|
||||
# TODO 如何处理 FINISH 状态,这一点应该需要与role 联动才能解决
|
||||
|
|
|
|||
|
|
@ -2,16 +2,19 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Desc : android assistant to learn from app operations and operate apps
|
||||
import time
|
||||
from typing import Optional
|
||||
from pathlib import Path
|
||||
from pydantic import Field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from examples.andriod_assistant.actions.manual_record import ManualRecord
|
||||
from examples.andriod_assistant.actions.parse_record import ParseRecord
|
||||
from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse
|
||||
from examples.andriod_assistant.actions.self_learn_and_reflect import SelfLearnAndReflect
|
||||
from examples.andriod_assistant.utils.schema import RunState, AndroidActionOutput
|
||||
from examples.andriod_assistant.actions.self_learn_and_reflect import (
|
||||
SelfLearnAndReflect,
|
||||
)
|
||||
from examples.andriod_assistant.utils.schema import AndroidActionOutput, RunState
|
||||
from metagpt.actions.add_requirement import UserRequirement
|
||||
from metagpt.config2 import config
|
||||
from metagpt.logs import logger
|
||||
|
|
@ -35,7 +38,7 @@ class AndroidAssistant(Role):
|
|||
super().__init__(**data)
|
||||
|
||||
self._watch([UserRequirement, AndroidActionOutput])
|
||||
|
||||
self.task_desc = config.get_other("task_desc", "Just explore any app in this phone!")
|
||||
app_name = config.get_other("app_name", "demo")
|
||||
curr_path = Path(__file__).parent
|
||||
data_dir = curr_path.joinpath("..", "output")
|
||||
|
|
@ -49,20 +52,20 @@ class AndroidAssistant(Role):
|
|||
# Remember, only run each action only one time, no need to run n_round.
|
||||
self.set_actions([ManualRecord, ParseRecord])
|
||||
self.task_dir = data_dir.joinpath(app_name, f"manual_learn_{cur_datetime}")
|
||||
self.docs_dir = data_dir.joinpath(app_name, f"manual_docs")
|
||||
self.docs_dir = data_dir.joinpath(app_name, "manual_docs")
|
||||
elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto":
|
||||
# choose SelfLearnAndReflect to run
|
||||
self.set_actions([SelfLearnAndReflect])
|
||||
self.task_dir = data_dir.joinpath(app_name, f"auto_learn_{cur_datetime}")
|
||||
self.docs_dir = data_dir.joinpath(app_name, f"auto_docs")
|
||||
self.docs_dir = data_dir.joinpath(app_name, "auto_docs")
|
||||
elif config.get_other("stage") == "act":
|
||||
# choose ScreenshotParse to run
|
||||
self.set_actions([ScreenshotParse])
|
||||
self.task_dir = data_dir.joinpath(app_name, f"act_{cur_datetime}")
|
||||
if config.get_other("mode") == "manual":
|
||||
self.docs_dir = data_dir.joinpath(app_name, f"manual_docs")
|
||||
self.docs_dir = data_dir.joinpath(app_name, "manual_docs")
|
||||
else:
|
||||
self.docs_dir = data_dir.joinpath(app_name, f"auto_docs")
|
||||
self.docs_dir = data_dir.joinpath(app_name, "auto_docs")
|
||||
self._check_dir()
|
||||
|
||||
self._set_react_mode(RoleReactMode.BY_ORDER)
|
||||
|
|
@ -80,20 +83,14 @@ class AndroidAssistant(Role):
|
|||
async def _act(self) -> Message:
|
||||
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
|
||||
todo = self.rc.todo
|
||||
# TODO 这里修改 Send to 会有作用吗?
|
||||
send_to = ""
|
||||
if isinstance(todo, ManualRecord):
|
||||
resp = await todo.run(
|
||||
task_dir=self.task_dir,
|
||||
task_desc=self.task_desc,
|
||||
env=self.rc.env
|
||||
)
|
||||
resp = await todo.run(task_dir=self.task_dir, task_desc=self.task_desc, env=self.rc.env)
|
||||
elif isinstance(todo, ParseRecord):
|
||||
resp = await todo.run(
|
||||
app_name=config.get_other("app_name", "demo"),
|
||||
task_dir=self.task_dir,
|
||||
docs_dir=self.docs_dir,
|
||||
env=self.rc.env
|
||||
env=self.rc.env,
|
||||
)
|
||||
elif isinstance(todo, SelfLearnAndReflect):
|
||||
resp = await todo.run(
|
||||
|
|
@ -102,11 +99,10 @@ class AndroidAssistant(Role):
|
|||
last_act=self.last_act,
|
||||
task_dir=self.task_dir,
|
||||
docs_dir=self.docs_dir,
|
||||
env=self.rc.env
|
||||
env=self.rc.env,
|
||||
)
|
||||
if resp.action_state == RunState.SUCCESS:
|
||||
self.last_act = resp.data.get("last_act")
|
||||
send_to = self.name
|
||||
elif isinstance(todo, ScreenshotParse):
|
||||
resp = await todo.run(
|
||||
round_count=self.round_count,
|
||||
|
|
@ -115,19 +111,18 @@ class AndroidAssistant(Role):
|
|||
task_dir=self.task_dir,
|
||||
docs_dir=self.docs_dir,
|
||||
grid_on=self.grid_on,
|
||||
env=self.rc.env
|
||||
env=self.rc.env,
|
||||
)
|
||||
if resp.action_state == RunState.SUCCESS:
|
||||
logger.info(f"grid_on: {resp.data.get('grid_on')}")
|
||||
self.grid_on = resp.data.get("grid_on")
|
||||
send_to = self.name
|
||||
|
||||
msg = Message(
|
||||
content=f"RoundCount: {self.round_count}",
|
||||
role=self.profile,
|
||||
cause_by=type(todo),
|
||||
cause_by=type(resp),
|
||||
send_from=self.name,
|
||||
send_to=self.name
|
||||
send_to=self.name,
|
||||
)
|
||||
self.publish_message(msg)
|
||||
# self.publish_message(msg)
|
||||
self.rc.memory.add(msg)
|
||||
return msg
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ def startup(
|
|||
"stage": stage,
|
||||
"mode": mode,
|
||||
"app_name": app_name,
|
||||
"task_desc": task_desc,
|
||||
"refine_doc": refine_doc,
|
||||
"min_dist": min_dist,
|
||||
"android_screenshot_dir": android_screenshot_dir,
|
||||
|
|
@ -68,15 +69,3 @@ def startup(
|
|||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
# Command python run_assistant.py "Create a contact in Contacts App named zjy with a phone number +86 18831933368"
|
||||
|
||||
# python run_assistant.py "Create a contact in Contacts App named zjy with a phone number +86 18831933368" --mode "auto" --app-name "Contacts"examples\andriod_assistant>
|
||||
|
||||
# TODO
|
||||
# 0. How to set Round ?
|
||||
# 1. Manual Record & Parse Record Success
|
||||
# 2. Self Learn Fail
|
||||
# local variable 'action' referenced before assignment
|
||||
# 3. Act
|
||||
# 3.1 TODO Act with Manual Docs
|
||||
# 3.2 TDOO Act with Auto Docs
|
||||
|
|
|
|||
|
|
@ -3,7 +3,8 @@
|
|||
# @Desc :
|
||||
|
||||
from enum import Enum
|
||||
from pydantic import Field, BaseModel, field_validator
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
|
||||
class ActionOp(Enum):
|
||||
|
|
@ -37,6 +38,7 @@ class Decision(Enum):
|
|||
|
||||
class AndroidElement(BaseModel):
|
||||
"""UI Element"""
|
||||
|
||||
uid: str = Field(default="")
|
||||
bbox: tuple[tuple[int, int], tuple[int, int]] = Field(default={})
|
||||
attrib: str = Field(default="")
|
||||
|
|
@ -44,6 +46,7 @@ class AndroidElement(BaseModel):
|
|||
|
||||
class OpLogItem(BaseModel):
|
||||
"""log content for self-learn or task act"""
|
||||
|
||||
step: int = Field(default=0)
|
||||
prompt: str = Field(default="")
|
||||
image: str = Field(default="")
|
||||
|
|
@ -52,6 +55,7 @@ class OpLogItem(BaseModel):
|
|||
|
||||
class ReflectLogItem(BaseModel):
|
||||
"""log content for self-learn-reflect"""
|
||||
|
||||
step: int = Field(default=0)
|
||||
prompt: str = Field(default="")
|
||||
image_before: str = Field(default="")
|
||||
|
|
@ -61,6 +65,7 @@ class ReflectLogItem(BaseModel):
|
|||
|
||||
class RecordLogItem(BaseModel):
|
||||
"""log content for record parse, same as ReflectLogItem"""
|
||||
|
||||
step: int = Field(default=0)
|
||||
prompt: str = Field(default="")
|
||||
image_before: str = Field(default="")
|
||||
|
|
@ -79,6 +84,7 @@ class DocContent(BaseModel):
|
|||
# start =================== define different Action Op and its params =============
|
||||
class RunState(Enum):
|
||||
"""run state"""
|
||||
|
||||
SUCCESS = "success"
|
||||
FINISH = "finish"
|
||||
FAIL = "fail"
|
||||
|
|
@ -101,6 +107,7 @@ class TextOp(BaseOpParam):
|
|||
class LongPressOp(BaseOpParam):
|
||||
area: int = Field(default=-1)
|
||||
|
||||
|
||||
# Modify This SwipeOp to SwipeOp_3, Need better name
|
||||
class SwipeOp_3(BaseOpParam):
|
||||
area: int = Field(default=-1)
|
||||
|
|
@ -113,7 +120,6 @@ class GridOp(BaseModel):
|
|||
|
||||
|
||||
class BaseGridOpParam(BaseOpParam):
|
||||
|
||||
@field_validator("act_name", mode="before")
|
||||
@classmethod
|
||||
def check_act_name(cls, act_name: str) -> str:
|
||||
|
|
|
|||
|
|
@ -2,20 +2,33 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Desc :
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
from xml.etree.ElementTree import Element, iterparse
|
||||
|
||||
import cv2
|
||||
from pathlib import Path
|
||||
import pyshine as ps
|
||||
import re
|
||||
|
||||
from metagpt.config2 import config
|
||||
from examples.andriod_assistant.utils.schema import (
|
||||
ActionOp,
|
||||
AndroidElement,
|
||||
BaseGridOpParam,
|
||||
BaseOpParam,
|
||||
Decision,
|
||||
GridOp,
|
||||
LongPressGridOp,
|
||||
LongPressOp,
|
||||
ReflectOp,
|
||||
RunState,
|
||||
SwipeGridOp,
|
||||
SwipeOp_3,
|
||||
TapGridOp,
|
||||
TapOp,
|
||||
TextOp,
|
||||
)
|
||||
from metagpt.logs import logger
|
||||
|
||||
from examples.andriod_assistant.utils.schema import AndroidElement
|
||||
from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \
|
||||
LongPressOp, LongPressGridOp, SwipeOp_3, SwipeGridOp, TextOp, RunState, ReflectOp, Decision
|
||||
|
||||
|
||||
def get_id_from_element(elem: Element) -> str:
|
||||
bounds = elem.attrib["bounds"][1:-1].split("][")
|
||||
|
|
@ -67,8 +80,13 @@ def traverse_xml_tree(xml_path: Path, elem_list: list[AndroidElement], attrib: s
|
|||
path.pop()
|
||||
|
||||
|
||||
def draw_bbox_multi(img_path: Path, output_path: Path, elem_list: list[AndroidElement], record_mode: bool = False,
|
||||
dark_mode: bool = False):
|
||||
def draw_bbox_multi(
|
||||
img_path: Path,
|
||||
output_path: Path,
|
||||
elem_list: list[AndroidElement],
|
||||
record_mode: bool = False,
|
||||
dark_mode: bool = False,
|
||||
):
|
||||
imgcv = cv2.imread(str(img_path))
|
||||
count = 1
|
||||
for elem in elem_list:
|
||||
|
|
@ -85,17 +103,35 @@ def draw_bbox_multi(img_path: Path, output_path: Path, elem_list: list[AndroidEl
|
|||
color = (0, 0, 250)
|
||||
else:
|
||||
color = (0, 250, 0)
|
||||
imgcv = ps.putBText(imgcv, label, text_offset_x=(left + right) // 2 + 10,
|
||||
text_offset_y=(top + bottom) // 2 + 10,
|
||||
vspace=10, hspace=10, font_scale=1, thickness=2, background_RGB=color,
|
||||
text_RGB=(255, 250, 250), alpha=0.5)
|
||||
imgcv = ps.putBText(
|
||||
imgcv,
|
||||
label,
|
||||
text_offset_x=(left + right) // 2 + 10,
|
||||
text_offset_y=(top + bottom) // 2 + 10,
|
||||
vspace=10,
|
||||
hspace=10,
|
||||
font_scale=1,
|
||||
thickness=2,
|
||||
background_RGB=color,
|
||||
text_RGB=(255, 250, 250),
|
||||
alpha=0.5,
|
||||
)
|
||||
else:
|
||||
text_color = (10, 10, 10) if dark_mode else (255, 250, 250)
|
||||
bg_color = (255, 250, 250) if dark_mode else (10, 10, 10)
|
||||
imgcv = ps.putBText(imgcv, label, text_offset_x=(left + right) // 2 + 10,
|
||||
text_offset_y=(top + bottom) // 2 + 10,
|
||||
vspace=10, hspace=10, font_scale=1, thickness=2, background_RGB=bg_color,
|
||||
text_RGB=text_color, alpha=0.5)
|
||||
imgcv = ps.putBText(
|
||||
imgcv,
|
||||
label,
|
||||
text_offset_x=(left + right) // 2 + 10,
|
||||
text_offset_y=(top + bottom) // 2 + 10,
|
||||
vspace=10,
|
||||
hspace=10,
|
||||
font_scale=1,
|
||||
thickness=2,
|
||||
background_RGB=bg_color,
|
||||
text_RGB=text_color,
|
||||
alpha=0.5,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR: An exception occurs while labeling the image\n{e}")
|
||||
count += 1
|
||||
|
|
@ -110,7 +146,7 @@ def draw_grid(img_path: Path, output_path: Path) -> tuple[int, int]:
|
|||
return i
|
||||
return -1
|
||||
|
||||
image = cv2.imread(img_path)
|
||||
image = cv2.imread(str(img_path))
|
||||
height, width, _ = image.shape
|
||||
color = (255, 116, 113)
|
||||
unit_height = get_unit_len(height)
|
||||
|
|
@ -130,16 +166,31 @@ def draw_grid(img_path: Path, output_path: Path) -> tuple[int, int]:
|
|||
right = int((j + 1) * unit_width)
|
||||
bottom = int((i + 1) * unit_height)
|
||||
cv2.rectangle(image, (left, top), (right, bottom), color, thick // 2)
|
||||
cv2.putText(image, str(label), (left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3), 0,
|
||||
int(0.01 * unit_width), (0, 0, 0), thick)
|
||||
cv2.putText(image, str(label), (left + int(unit_width * 0.05), top + int(unit_height * 0.3)), 0,
|
||||
int(0.01 * unit_width), color, thick)
|
||||
cv2.imwrite(output_path, image)
|
||||
cv2.putText(
|
||||
image,
|
||||
str(label),
|
||||
(left + int(unit_width * 0.05) + 3, top + int(unit_height * 0.3) + 3),
|
||||
0,
|
||||
int(0.01 * unit_width),
|
||||
(0, 0, 0),
|
||||
thick,
|
||||
)
|
||||
cv2.putText(
|
||||
image,
|
||||
str(label),
|
||||
(left + int(unit_width * 0.05), top + int(unit_height * 0.3)),
|
||||
0,
|
||||
int(0.01 * unit_width),
|
||||
color,
|
||||
thick,
|
||||
)
|
||||
cv2.imwrite(str(output_path), image)
|
||||
return rows, cols
|
||||
|
||||
|
||||
def area_to_xy(area: int, subarea: str, width: int, height: int, rows: int, cols: int) -> tuple[int, int]:
|
||||
area -= 1
|
||||
logger.info(f"{cols}")
|
||||
row, col = area // cols, area % cols
|
||||
x_0, y_0 = col * (width // cols), row * (height // rows)
|
||||
if subarea == "top-left":
|
||||
|
|
@ -174,9 +225,11 @@ def reflect_parse_extarct(parsed_json: dict) -> ReflectOp:
|
|||
if decision not in Decision.values():
|
||||
op = ReflectOp(param_state=RunState.FAIL)
|
||||
else:
|
||||
op = ReflectOp(decision=parsed_json.get("Decision"),
|
||||
thought=parsed_json.get("Thought"),
|
||||
documentation=parsed_json.get("Documentation"))
|
||||
op = ReflectOp(
|
||||
decision=parsed_json.get("Decision"),
|
||||
thought=parsed_json.get("Thought"),
|
||||
documentation=parsed_json.get("Documentation"),
|
||||
)
|
||||
return op
|
||||
|
||||
|
||||
|
|
@ -237,11 +290,9 @@ def screenshot_parse_extract_with_grid(act_name: str, act: str, last_act: str) -
|
|||
elif act_name == ActionOp.SWIPE.value:
|
||||
params = re.findall(r"swipe\((.*?)\)", act)[0].split(",")
|
||||
params = op_params_clean(params)
|
||||
op = SwipeGridOp(act_name=act_name,
|
||||
start_area=params[0],
|
||||
start_subarea=params[1],
|
||||
end_area=params[2],
|
||||
end_subarea=params[3])
|
||||
op = SwipeGridOp(
|
||||
act_name=act_name, start_area=params[0], start_subarea=params[1], end_area=params[2], end_subarea=params[3]
|
||||
)
|
||||
elif act_name == ActionOp.GRID.value:
|
||||
op = GridOp(act_name=act_name)
|
||||
else:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue