update andriod_assistant

This commit is contained in:
better629 2024-01-28 19:53:07 +08:00
parent 42425ef403
commit 0343db3350
9 changed files with 291 additions and 139 deletions

View file

@ -8,7 +8,7 @@ from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_template,
screenshot_parse_with_grid_template,
)
from examples.andriod_assistant.utils.schema import OpLogItem, ActionOp, ParamExtState, GridOp, ActionOp, TapOp, TapGridOp, \
from examples.andriod_assistant.utils.schema import OpLogItem, ParamExtState, GridOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, AndroidElement
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, area_to_xy, screenshot_parse_extract, elem_bbox_to_xy

View file

@ -1,67 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage
from pathlib import Path
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_template,
)
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.utils.common import encode_image
class SelfLearn(Action):
name: str = "SelfLearn"
useless_list: list[str] = [] # store useless elements uid
async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv):
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)
)
xml_path: Path = env.step(
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
# TODO exit
return
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
draw_bbox_multi(screenshot_path, task_dir.joinpath(f"{round_count}_before_labeled.png"), elem_list)
img_base64 = encode_image(task_dir.joinpath(f"{round_count}_before_labeled.png"))
self_explore_template = screenshot_parse_self_explore_template
context = self_explore_template.format(task_description=task_desc, last_act=last_act)
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])

View file

@ -0,0 +1,198 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : LIKE scripts/self_explorer.py in stage=learn & mode=auto self_explore_task stage
from pathlib import Path
import ast
from examples.andriod_assistant.actions.screenshot_parse_an import SCREENSHOT_PARSE_NODE
from examples.andriod_assistant.actions.self_learn_reflect_an import SELF_LEARN_REFLECT_NODE
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_template, screenshot_parse_self_explore_reflect_template as reflect_template
)
from examples.andriod_assistant.utils.schema import AndroidElement, OpLogItem, ReflectLogItem, ParamExtState, TapOp, TextOp, SwipeOp, LongPressOp, ActionOp, Decision, DocContent
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree, screenshot_parse_extract, elem_bbox_to_xy, reflect_parse_extarct
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.utils.common import encode_image
from metagpt.const import ADB_EXEC_FAIL
from metagpt.logs import logger
class SelfLearnAndReflect(Action):
name: str = "SelfLearnAndReflect"
useless_list: list[str] = [] # store useless elements uid
screenshot_before_path: str = ""
screenshot_before_base64: str = ""
elem_list: list[AndroidElement] = []
swipe_orient: str = "up"
act_name: str = ""
ui_area: int = -1
async def run(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
async def run_self_learn(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv):
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)
)
xml_path: Path = env.step(
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{round_count}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
# TODO exit
return
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
screenshot_before_labeled_path = task_dir.joinpath(f"{round_count}_before_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_before_labeled_path, elem_list)
img_base64 = encode_image(screenshot_before_labeled_path)
self.screenshot_before_base64 = img_base64
self.screenshot_before_path = screenshot_before_labeled_path
self_explore_template = screenshot_parse_self_explore_template
context = self_explore_template.format(task_description=task_desc, last_act=last_act)
node = await SCREENSHOT_PARSE_NODE.fill(context=context, llm=self.llm, images=[img_base64])
if "error" in node.content:
# TODO
return
prompt = node.compile(context=context, schema="json", mode="auto")
log_item = OpLogItem(step=round_count, prompt=prompt, image=screenshot_before_labeled_path, response=node.content)
op_param = screenshot_parse_extract(node.instruct_content.model_dump(), grid_on=False)
if op_param.param_state == ParamExtState.FINISH:
# TODO
return
if op_param.param_state == ParamExtState.FAIL:
# TODO
return
if isinstance(op_param, TapOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("system_tap", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
elif isinstance(op_param, TextOp):
res = env.step(EnvAPIAbstract("user_input", kwargs={"input_txt": op_param.input_str}))
if res == ADB_EXEC_FAIL:
# TODO
return
elif isinstance(op_param, LongPressOp):
self.ui_area = op_param.area
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_longpress", kwargs={"x": x, "y": y}))
if res == ADB_EXEC_FAIL:
# TODO
return
elif isinstance(op_param, SwipeOp):
self.ui_area = op_param.area
self.swipe_orient = op_param.swipe_orient
x, y = elem_bbox_to_xy(elem_list[op_param.area - 1].bbox)
res = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": op_param.swipe_orient, "dist": op_param.dist}))
if res == ADB_EXEC_FAIL:
# TODO
return
self.elem_list = elem_list
self.act_name = op_param.act_name
async def run_reflect(self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_after", "local_save_dir": task_dir}
)
)
if not screenshot_path.exists():
# TODO
return
screenshot_after_labeled_path = task_dir.joinpath(f"{round_count}_after_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_after_labeled_path, elem_list=self.elem_list)
img_base64 = encode_image(screenshot_after_labeled_path)
if self.act_name == ActionOp.TAP.value:
action = "tapping"
elif self.act_name == ActionOp.LONG_PRESS.value:
action = "long pressing"
elif self.act_name == ActionOp.SWIPE.value:
action = "swiping"
if self.swipe_orient == SwipeOp.UP.value or self.swipe_orient == SwipeOp.DOWN.value:
action = "v_swipe"
elif self.swipe_orient == SwipeOp.LEFT.value or self.swipe_orient == SwipeOp.RIGHT.value:
action = "h_swipe"
context = reflect_template.format(action=action, ui_element=str(self.ui_area), task_desc=task_desc, last_act=last_act)
node = await SELF_LEARN_REFLECT_NODE.fill(context=context, llm=self.llm, images=[self.screenshot_before_base64, img_base64])
if "error" in node.content:
# TODO
return
prompt = node.compile(context=context, schema="json", mode="auto")
log_item = ReflectLogItem(step=round_count, prompt=prompt, image_before=self.screenshot_before_path,
image_after=screenshot_after_labeled_path, response=node.content)
op_param = reflect_parse_extarct(node.instruct_content.model_dump())
if op_param.param_state == ParamExtState.FINISH:
# TODO
return
if op_param.param_state == ParamExtState.FAIL:
# TODO
return
resource_id = self.elem_list[int(self.ui_area) -1].uid
if op_param.decision == Decision.INEFFECTIVE.value:
self.useless_list.append(resource_id)
last_act = "NONE" # TODO global
elif op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value, Decision.SUCCESS.value]:
if op_param.decision in [Decision.BACK.value, Decision.CONTINUE.value]:
self.useless_list.append(resource_id)
last_act = "NONE"
if op_param.decision == Decision.BACK.value:
res = env.step(EnvAPIAbstract("system_back"))
if res == ADB_EXEC_FAIL:
# TODO
return
doc = op_param.documentation
doc_path = docs_dir.joinpath(f"{resource_id}.txt")
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[self.act_name]:
logger.info(f"Documentation for the element {resource_id} already exists.")
# TODO
return
else:
doc_content = DocContent()
setattr(doc_content, self.act_name, doc)
doc_path.write_text(str(doc_content))

View file

@ -1,61 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : LIKE scripts/self_explorer.py self_explore_reflect stage
from pathlib import Path
from examples.andriod_assistant.prompts.assistant_prompt import (
screenshot_parse_self_explore_reflect_template,
)
from examples.andriod_assistant.utils.schema import AndroidElement, ActionOp, SwipeOp
from examples.andriod_assistant.utils.utils import draw_bbox_multi
from metagpt.actions.action import Action
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.environment.api.env_api import EnvAPIAbstract
from metagpt.utils.common import encode_image
class SelfLearnReflect(Action):
name: str = "SelfLearnReflect"
async def run(
self,
round_count: int,
task_desc: str,
last_act: str,
task_dir: Path,
env: AndroidEnv,
elem_list: list[AndroidElement],
act_name: str,
swipe_orient: str,
ui_area: int,
):
if act_name == "text":
# TODO ignore current reflect
return
screenshot_path: Path = env.step(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
)
)
if not screenshot_path.exists():
# TODO exit
return
draw_bbox_multi(screenshot_path, task_dir.joinpath(f"{round_count}_after_labeled.png"), elem_list)
encode_image(task_dir.joinpath(f"{round_count}_after_labeled.png"))
reflect_template = screenshot_parse_self_explore_reflect_template
if act_name == ActionOp.TAP.value:
action = "tapping"
elif act_name == ActionOp.LONG_PRESS.value:
action = "long pressing"
elif act_name == ActionOp.SWIPE.value:
action = "swiping"
if swipe_orient == SwipeOp.UP.value or swipe_orient == SwipeOp.DOWN.value:
action = "v_swipe"
elif swipe_orient == SwipeOp.LEFT.value or swipe_orient == SwipeOp.RIGHT.value:
action = "h_swipe"
reflect_template.format(action=action, ui_element=str(ui_area), task_desc=task_desc, last_act=last_act)

View file

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : the ActionNode to parse Reflection
from metagpt.actions.action_node import ActionNode
DECISION = ActionNode(
key="Decision",
expected_type=str,
instruction="explain why you made this decision",
example="BACK"
)
THOUGHT = ActionNode(
key="Thought",
expected_type=str,
instruction="explain why you made this decision",
example=""
)
DOCUMENTATION = ActionNode(
key="Documentation",
expected_type=str,
instruction="describe the function of the UI element",
example=""
)
NODES = [DECISION, THOUGHT, DOCUMENTATION]
SELF_LEARN_REFLECT_NODE = ActionNode.from_children("SelfLearnReflect", NODES)

View file

@ -145,6 +145,7 @@ should continue to interact with other elements on the screen. Notice that if yo
changed between the two screenshots, then they are not identical. Your output should be in the following format:
Decision: INEFFECTIVE
Thought: <explain why you made this decision>
Documentation: <None>
3. CONTINUE
If you find the action changed something on the screen but does not reflect the action description above and did not
move the given task forward, you should continue to interact with other elements on the screen. At the same time,

View file

@ -2,43 +2,62 @@
# -*- coding: utf-8 -*-
# @Desc : android assistant to learn from app operations and operate apps
from typing import Optional
from pathlib import Path
from pydantic import Field
from examples.andriod_assistant.actions.manual_record import ManualRecord
from examples.andriod_assistant.actions.parse_record import ParseRecord
from examples.andriod_assistant.actions.screenshot_parse import ScreenshotParse
from examples.andriod_assistant.actions.self_learn import SelfLearn
from examples.andriod_assistant.actions.self_learn_and_reflect import SelfLearnAndReflect
from examples.andriod_assistant.actions.self_learn_reflect import SelfLearnReflect
from metagpt.actions.add_requirement import UserRequirement
from metagpt.config2 import config
from metagpt.logs import logger
from metagpt.roles.role import Role
from metagpt.roles.role import Role, RoleReactMode
from metagpt.schema import Message
class AndroidAssistant(Role):
name: str = "Nick"
profile: str = "AndroidAssistant"
goal: str = "operate the phone apps with self-learn"
goal: str = "operate the mobile phone's apps with self-learn"
task_desc: str = ""
round_count: int = 0
last_act: str = ""
task_dir: Optional[Path] = Field(default=None)
def __init__(self, **data):
super().__init__(**data)
self._watch([UserRequirement])
self.set_actions([ManualRecord, ParseRecord, SelfLearn, SelfLearnReflect, ScreenshotParse])
async def _think(self) -> bool:
"""Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app,
run the learn first and then do the act stage or learn it during the action.
"""
if config.get_other("stage") == "learn" and config.get_other("mode") == "manual":
# choose ManualRecord and then run ParseRecord
# Remember, only run each action only one time, no need to run n_round.
pass
self.set_actions([ManualRecord, ParseRecord])
elif config.get_other("stage") == "learn" and config.get_other("mode") == "auto":
# choose SelfLearn / SelfLearnReflect to run
pass
# choose SelfLearnAndReflect / SelfLearnReflect to run
self.set_actions([SelfLearnAndReflect, SelfLearnReflect])
elif config.get_other("stage") == "act":
# choose ScreenshotParse to run
pass
self.set_actions([ScreenshotParse])
self._set_react_mode(RoleReactMode.BY_ORDER)
async def react(self) -> Message:
self.round_count += 1
super().react()
async def _think(self) -> bool:
"""Firstly, we decide the state with user config, further, we can do it automatically, like if it's new app,
run the learn first and then do the act stage or learn it during the action.
"""
pass
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo

View file

@ -22,6 +22,17 @@ class SwipeOp(Enum):
RIGHT = "right"
class Decision(Enum):
BACK = "BACK"
INEFFECTIVE = "INEFFECTIVE"
CONTINUE = "CONTINUE"
SUCCESS = "SUCCESS"
@classmethod
def values(cls):
return [item.value for item in cls]
class AndroidElement(BaseModel):
"""UI Element"""
uid: str = Field(default="")
@ -115,3 +126,10 @@ class SwipeGridOp(BaseGridOpParam):
end_subarea: str = Field(default="")
# end =================== define different Action Op and its params =============
class ReflectOp(BaseModel):
decision: str = ""
thought: str = ""
documentation: str = ""
param_state: ParamExtState = ParamExtState.SUCCESS

View file

@ -14,7 +14,7 @@ from metagpt.logs import logger
from examples.andriod_assistant.utils.schema import AndroidElement
from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, ParamExtState
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, ParamExtState, ReflectOp, Decision
def get_id_from_element(elem: Element) -> str:
@ -167,6 +167,17 @@ def elem_bbox_to_xy(bbox: tuple[tuple[int, int]]) -> tuple[int, int]:
return x, y
def reflect_parse_extarct(parsed_json: dict) -> ReflectOp:
decision = parsed_json.get("Decision")
if decision not in Decision.values():
op = ReflectOp(param_state=ParamExtState.FAIL)
else:
op = ReflectOp(decision=parsed_json.get("Decision"),
thought=parsed_json.get("Thought"),
documentation=parsed_json.get("Documentation"))
return op
def screenshot_parse_extract(parsed_json: dict, grid_on: bool = False) -> Union[BaseOpParam, BaseGridOpParam, GridOp]:
act = parsed_json.get("Action")
last_act = parsed_json.get("Summary")