add andriod_assistant action_output and update ui_document

This commit is contained in:
better629 2024-01-29 17:17:28 +08:00
parent 7f06870691
commit d617a1ce96
6 changed files with 131 additions and 91 deletions

View file

@ -6,7 +6,7 @@ import os
import time
from pathlib import Path
from examples.andriod_assistant.utils.schema import OpLogItem, ActionOp, ParamExtState, GridOp, ActionOp, TapOp, \
from examples.andriod_assistant.utils.schema import OpLogItem, ActionOp, RunState, GridOp, ActionOp, TapOp, \
TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, AndroidElement
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree

View file

@ -3,13 +3,14 @@
# @Desc : LIKE scripts/task_executor.py in stage=act
from pathlib import Path
import ast
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_template,
screenshot_parse_with_grid_template,
)
from examples.andriod_assistant.utils.schema import OpLogItem, ParamExtState, GridOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, AndroidElement
from examples.andriod_assistant.utils.schema import OpLogItem, RunState, GridOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, AndroidElement, AndroidActionOutput
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, area_to_xy, \
screenshot_parse_extract, elem_bbox_to_xy
@ -24,8 +25,38 @@ from metagpt.const import ADB_EXEC_FAIL
class ScreenshotParse(Action):
name: str = "ScreenshotParse"
def _makeup_ui_document(self, elem_list: list[AndroidElement], docs_idr: Path, use_exist_doc: bool = True) -> str:
if not use_exist_doc:
return ""
ui_doc = """
You also have access to the following documentations that describes the functionalities of UI
elements you can interact on the screen. These docs are crucial for you to determine the target of your
next action. You should always prioritize these documented elements for interaction:"""
for i, elem in enumerate(elem_list):
doc_path = docs_idr.joinpath(f"{elem.uid}.txt")
if not doc_path.exists():
continue
ui_doc += f"Documentation of UI element labeled with the numeric tag '{i + 1}':\n"
doc_content = ast.literal_eval(open(doc_path, "r").read())
if doc_content["tap"]:
ui_doc += f"This UI element is clickable. {doc_content['tap']}\n\n"
if doc_content["text"]:
ui_doc += f"This UI element can receive text input. The text input is used for the following " \
f"purposes: {doc_content['text']}\n\n"
if doc_content["long_press"]:
ui_doc += f"This UI element is long clickable. {doc_content['long_press']}\n\n"
if doc_content["v_swipe"]:
ui_doc += f"This element can be swiped directly without tapping. You can swipe vertically on " \
f"this UI element. {doc_content['v_swipe']}\n\n"
if doc_content["h_swipe"]:
ui_doc += f"This element can be swiped directly without tapping. You can swipe horizontally on " \
f"this UI element. {doc_content['h_swipe']}\n\n"
return ui_doc
async def run(
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv, grid_on: bool = False
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, grid_on: bool, env: AndroidEnv
):
screenshot_path: Path = env.step(
EnvAPIAbstract(
@ -36,8 +67,7 @@ class ScreenshotParse(Action):
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
# TODO exit
return
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
@ -64,51 +94,41 @@ class ScreenshotParse(Action):
parse_template = screenshot_parse_with_grid_template if grid_on else screenshot_parse_template
# makeup `ui_doc`
# TODO
ui_doc = ""
ui_doc = self._makeup_ui_document(elem_list, docs_dir)
context = parse_template.format(ui_document=ui_doc, task_description=task_desc, last_act=last_act)
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])
if "error" in node.content:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
log_item = OpLogItem(step=round_count, prompt=prompt, image=screenshot_labeled_path, response=node.content)
op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on)
if op_param.param_state == ParamExtState.FINISH:
# TODO
return
if op_param.param_state == ParamExtState.FAIL:
# TODO
return
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
if isinstance(op_param, TapOp):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, TextOp):
res = env.step(EnvAPIAbstract("user_input", kwargs={"input_txt": op_param.input_str}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, LongPressOp):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeOp):
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, GridOp):
grid_on = True
elif isinstance(op_param, TapGridOp) or isinstance(op_param, LongPressGridOp):
@ -116,21 +136,20 @@ class ScreenshotParse(Action):
if isinstance(op_param, TapGridOp):
res = env.step(EnvAPIAbstract("system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
else:
# LongPressGridOp
res = env.step(EnvAPIAbstract("user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeGridOp):
start_x, start_y = area_to_xy(op_param.start_area, op_param.start_subarea)
end_x, end_y = area_to_xy(op_param.end_area, op_param.end_subarea)
res = env.step(EnvAPIAbstract("user_swipe_to", kwargs={"start": (start_x, start_y), "end": (end_x, end_y)}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
if op_param.act_name != "grid":
grid_on = True # TODO overwrite it
grid_on = True
return AndroidActionOutput(data={"grid_on": grid_on})

View file

@ -10,8 +10,8 @@ from examples.andriod_assistant.actions.self_learn_reflect_an import SELF_LEARN_
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_template, screenshot_parse_self_explore_reflect_template as reflect_template
)
from examples.andriod_assistant.utils.schema import AndroidElement, OpLogItem, ReflectLogItem, ParamExtState, TapOp, \
TextOp, SwipeOp, LongPressOp, ActionOp, Decision, DocContent
from examples.andriod_assistant.utils.schema import AndroidElement, OpLogItem, ReflectLogItem, RunState, TapOp, \
TextOp, SwipeOp, LongPressOp, ActionOp, Decision, DocContent, AndroidActionOutput
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, screenshot_parse_extract, \
elem_bbox_to_xy, reflect_parse_extarct
from metagpt.actions.action import Action
@ -35,11 +35,12 @@ class SelfLearnAndReflect(Action):
act_name: str = ""
ui_area: int = -1
async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv) -> AndroidActionOutput:
resp = self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
resp = self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
return resp
async def run_self_learn(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv):
async def run_self_learn(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv) -> AndroidActionOutput:
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
@ -49,8 +50,7 @@ class SelfLearnAndReflect(Action):
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
# TODO exit
return
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
@ -87,58 +87,51 @@ class SelfLearnAndReflect(Action):
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])
if "error" in node.content:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
log_item = OpLogItem(step=round_count, prompt=prompt, image=screenshot_before_labeled_path, response=node.content)
op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on=False)
if op_param.param_state == ParamExtState.FINISH:
# TODO
return
if op_param.param_state == ParamExtState.FAIL:
# TODO
return
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
if isinstance(op_param, TapOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, TextOp):
res = env.step(EnvAPIAbstract("user_input", kwargs={"input_txt": op_param.input_str}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, LongPressOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
elif isinstance(op_param, SwipeOp):
self.ui_area = op_param.area
self.swipe_orient = op_param.swipe_orient
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
self.elem_list = elem_list
self.act_name = op_param.act_name
return AndroidActionOutput()
async def run_reflect(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
async def run_reflect(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv) -> AndroidActionOutput:
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_after", "local_save_dir": task_dir}
)
)
if not screenshot_path.exists():
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list)
@ -158,20 +151,17 @@ class SelfLearnAndReflect(Action):
node = await SELF_LEARN_REFLECT_NODE.fill(context=context, llm=self.llm, images=[self.screenshot_before_base64, img_base64])
if "error" in node.content:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
prompt = node.compile(context=context, schema="json", mode="auto")
log_item = ReflectLogItem(step=round_count, prompt=prompt, image_before=self.screenshot_before_path,
image_after=screenshot_after_labeled_path, response=node.content)
op_param = reflect_parse_extarct(node.instruct_content.model_dump())
if op_param.param_state == ParamExtState.FINISH:
# TODO
return
if op_param.param_state == ParamExtState.FAIL:
# TODO
return
if op_param.param_state == RunState.FINISH:
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
resource_id = self.elem_list[int(self.ui_area) -1].uid
if op_param.decision == Decision.INEFFECTIVE.value:
@ -184,17 +174,17 @@ class SelfLearnAndReflect(Action):
if op_param.decision == Decision.BACK.value:
res = env.step(EnvAPIAbstract("system_back"))
if res == ADB_EXEC_FAIL:
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
doc = op_param.documentation
doc_path = docs_dir.joinpath(f"{resource_id}.txt")
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[self.act_name]:
logger.info(f"Documentation for the element {resource_id} already exists.")
# TODO
return
return AndroidActionOutput(action_state=RunState.FAIL)
else:
doc_content = DocContent()
setattr(doc_content, self.act_name, doc)
doc_path.write_text(str(doc_content))
return AndroidActionOutput(data={"last_act": last_act})

View file

@ -10,7 +10,7 @@ from examples.andriod_assistant.actions.manual_record import ManualRecord
from examples.andriod_assistant.actions.parse_record import ParseRecord
from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse
from examples.andriod_assistant.actions.self_learn_and_reflect import SelfLearnAndReflect
from examples.andriod_assistant.actions.self_learn_reflect import SelfLearnReflect
from examples.andriod_assistant.utils.schema import RunState
from metagpt.actions.add_requirement import UserRequirement
from metagpt.config2 import config
from metagpt.logs import logger
@ -27,6 +27,8 @@ class AndroidAssistant(Role):
round_count: int = 0
last_act: str = ""
task_dir: Optional[Path] = Field(default=None)
docs_dir: Optional[Path] = Field(default=None)
grid_on: bool = Field(default=False)
def __init__(self, **data):
super().__init__(**data)
@ -41,8 +43,8 @@ class AndroidAssistant(Role):
# Remember, only run each action only one time, no need to run n_round.
self.set_actions([ManualRecord, ParseRecord])
elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto":
# choose SelfLearnAndReflect / SelfLearnReflect to run
self.set_actions([SelfLearnAndReflect, SelfLearnReflect])
# choose SelfLearnAndReflect to run
self.set_actions([SelfLearnAndReflect])
elif config.get_other("stage") == "act":
# choose ScreenshotParse to run
self.set_actions([ScreenshotParse])
@ -52,12 +54,36 @@ class AndroidAssistant(Role):
self.round_count += 1
super().react()
async def _think(self) -> bool:
"""Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app,
run the learn first and then do the act stage or learn it during the action.
"""
pass
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
send_to = ""
if isinstance(todo, ManualRecord):
resp = await todo.run()
elif isinstance(todo, ParseRecord):
resp = await todo.run()
elif isinstance(todo, SelfLearnAndReflect):
resp = await todo.run(round_count=self.round_count,
task_desc=self.task_desc,
last_act=self.last_act,
task_dir=self.task_dir,
docs_dir=self.docs_dir,
env=self.rc.env)
if resp.action_state == RunState.SUCCESS:
self.last_act = resp.data.get("last_act")
send_to = self.name
elif isinstance(todo, ScreenshotParse):
resp = await todo.run(round_count=self.round_count,
task_desc=self.task_desc,
last_act=self.last_act,
task_dir=self.task_dir,
grid_on=self.grid_on,
env=self.rc.env)
if resp.action_state == RunState.SUCCESS:
self.grid_on = resp.data.get("grid_on")
send_to = self.name
msg = Message(f"RoundCount: {self.round_count}", send_to=send_to)
self.rc.memory.add(msg)
return msg

View file

@ -66,8 +66,8 @@ class DocContent(BaseModel):
# start =================== define different Action Op and its params =============
class ParamExtState(Enum):
"""Op params extract state"""
class RunState(Enum):
"""run state"""
SUCCESS = "success"
FINISH = "finish"
FAIL = "fail"
@ -76,7 +76,7 @@ class ParamExtState(Enum):
class BaseOpParam(BaseModel):
act_name: str = Field(default="", validate_default=True)
last_act: str = Field(default="")
param_state: ParamExtState = Field(default=ParamExtState.SUCCESS, description="return state when extract params")
param_state: RunState = Field(default=RunState.SUCCESS, description="return state when extract params")
class TapOp(BaseOpParam):
@ -132,4 +132,9 @@ class ReflectOp(BaseModel):
decision: str = ""
thought: str = ""
documentation: str = ""
param_state: ParamExtState = ParamExtState.SUCCESS
param_state: RunState = RunState.SUCCESS
class AndroidActionOutput(BaseModel):
data: dict = Field(default=dict())
action_state: RunState = Field(default=RunState.SUCCESS)

View file

@ -14,7 +14,7 @@ from metagpt.logs import logger
from examples.andriod_assistant.utils.schema import AndroidElement
from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, ParamExtState, ReflectOp, Decision
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, RunState, ReflectOp, Decision
def get_id_from_element(elem: Element) -> str:
@ -170,7 +170,7 @@ def elem_bbox_to_xy(bbox: tuple[tuple[int, int]]) -> tuple[int, int]:
def reflect_parse_extarct(parsed_json: dict) -> ReflectOp:
decision = parsed_json.get("Decision")
if decision not in Decision.values():
op = ReflectOp(param_state=ParamExtState.FAIL)
op = ReflectOp(param_state=RunState.FAIL)
else:
op = ReflectOp(decision=parsed_json.get("Decision"),
thought=parsed_json.get("Thought"),
@ -183,8 +183,8 @@ def screenshot_parse_extract(parsed_json: dict, grid_on: bool = False) -> Union[
last_act = parsed_json.get("Summary")
act_name = act.split("(")[0]
if ParamExtState.FINISH.value.upper() in act:
return BaseOpParam(param_state=ParamExtState.FINISH)
if RunState.FINISH.value.upper() in act:
return BaseOpParam(param_state=RunState.FINISH)
if grid_on:
return screenshot_parse_extract_with_grid(act_name, act, last_act)
@ -219,7 +219,7 @@ def screenshot_parse_extract_without_grid(act_name: str, act: str, last_act: str
elif act_name == ActionOp.GRID.value:
op = GridOp(act_name=act_name)
else:
op = BaseOpParam(param_state=ParamExtState.FAIL)
op = BaseOpParam(param_state=RunState.FAIL)
return op
@ -243,5 +243,5 @@ def screenshot_parse_extract_with_grid(act_name: str, act: str, last_act: str) -
elif act_name == ActionOp.GRID.value:
op = GridOp(act_name=act_name)
else:
op = BaseGridOpParam(param_state=ParamExtState.FAIL)
op = BaseGridOpParam(param_state=RunState.FAIL)
return op