Update mannual record action node

Modify Schema SwipeOp to SwipeOp_3, this variable need a better name.
This commit is contained in:
Jiayi Zhang 2024-02-23 21:19:49 +08:00
parent 13cf80b46a
commit a15e7af5a8
5 changed files with 92 additions and 52 deletions

View file

@ -8,8 +8,10 @@ import cv2
from examples.andriod_assistant.utils.schema import (
ActionOp,
AndroidActionOutput,
AndroidElement,
SwipeOp,
RunState,
SwipeOp
)
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree
from metagpt.actions.action import Action
@ -22,33 +24,54 @@ from metagpt.logs import logger
class ManualRecord(Action):
"""do a human operation on the screen with human input"""
name: str = "ManualRecord"
async def run(self, demo_name: str, task_dir: Path, env: AndroidEnv):
# Question 这里是将通过ADB获取的东西存到本地的路径的吧
screenshot_path: Path = env.step(
EnvAPIAbstract(api_name="get_screenshot", kwargs={"ss_name": f"{demo_name}", "local_save_dir": task_dir})
)
xml_path: Path = env.step(
EnvAPIAbstract(api_name="get_xml", kwargs={"xml_name": f"{demo_name}", "local_save_dir": task_dir})
)
if not screenshot_path.exists() or not xml_path.exists():
# TODO exit
return
useless_list: list[str] = [] # store useless elements uid
record_path: str = ""
task_desc_path: str = ""
screenshot_before_path: str = ""
screenshot_after_path: str = ""
xml_path: str = ""
async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv):
self.record_path = Path(task_dir) / "record.txt"
record_file = open(self.record_path, "w")
self.task_desc_path = Path(task_dir) / "task_desc.txt"
with open(self.task_desc_path, "w") as f:
f.write(task_desc)
self.screenshot_before_path = Path(task_dir)/"raw_screenshots"
self.screenshot_after_path = Path(task_dir)/"labeled_screenshots"
self.xml_path = Path(task_dir)/"xml"
step = 0
record_path = Path(task_dir) / "record.txt"
record_file = open(record_path, "w")
while True:
# TODO Parse Record Step 是否可以从这个函数中获取,进行参数的传递
step += 1
screenshot_path: Path = env.observe(
EnvAPIAbstract(
api_name="get_screenshot",
kwargs={"ss_name": f"{demo_name}_{step}", "local_save_dir": self.screenshot_before_path}
)
)
xml_path: Path = env.observe(
EnvAPIAbstract(
api_name="get_xml",
kwargs={"xml_name": f"{demo_name}_{step}", "local_save_dir": self.xml_path}
)
)
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list: list[AndroidElement] = clickable_list.copy()
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
@ -56,12 +79,14 @@ class ManualRecord(Action):
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
# TODO Modify config to default 30. It should be modified back config after single action test
# if dist <= config.get_other("min_dist"):
if dist <= 30:
close = True
break
if not close:
elem_list.append(elem)
screenshot_labeled_path = task_dir.joinpath(f"{task_dir}_{step}_labeled.png")
screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{demo_name}_{step}_labeled.png")
labeled_img = draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list)
cv2.imshow("image", labeled_img)
@ -70,17 +95,17 @@ class ManualRecord(Action):
user_input = "xxx"
logger.info(
"Choose one of the following actions you want to perform on the current screen:\ntap, text, long "
"press, swipe, stop",
"Choose one of the following actions you want to perform on the current screen:\ntap, text, long_press,"
"swipe, stop",
"blue",
)
while (
user_input.lower() != ActionOp.TAP.value
and user_input.lower() != ActionOp.TEXT.value
and user_input.lower() != ActionOp.LONG_PRESS.value
and user_input.lower() != ActionOp.SWIPE.value
and user_input.lower() != ActionOp.STOP.value
user_input.lower() != ActionOp.TAP.value
and user_input.lower() != ActionOp.TEXT.value
and user_input.lower() != ActionOp.LONG_PRESS.value
and user_input.lower() != ActionOp.SWIPE.value
and user_input.lower() != ActionOp.STOP.value
):
user_input = input()
@ -93,11 +118,9 @@ class ManualRecord(Action):
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
ret = env.step(EnvAPIAbstract(api_name="user_tap", kwargs={"x": x, "y": y}))
# Question 将 ERROR 替换为 ADB_EXEC_FAIL(FAILED)
ret = env.step(EnvAPIAbstract(api_name="system_tap", kwargs={"x": x, "y": y}))
if ret == ADB_EXEC_FAIL:
logger.info("ERROR: tap execution failed", "red")
break
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"tap({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.TEXT.value:
logger.info(
@ -123,10 +146,9 @@ class ManualRecord(Action):
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
ret = env.step(EnvAPIAbstract(api_name="user_longpress", kwargs={"x": x, "y": y}))
if ret == ADB_EXEC_FAIL:
logger.info("ERROR: long press execution failed", "red")
break
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"long_press({int(user_input)}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.SWIPE.value:
logger.info(
@ -136,10 +158,10 @@ class ManualRecord(Action):
)
user_input = ""
while (
user_input != SwipeOp.UP.value
and user_input != SwipeOp.DOWN.value
and user_input != SwipeOp.LEFT.value
and user_input != SwipeOp.RIGHT.value
user_input != SwipeOp.UP.value
and user_input != SwipeOp.DOWN.value
and user_input != SwipeOp.LEFT.value
and user_input != SwipeOp.RIGHT.value
):
user_input = input()
swipe_dir = user_input
@ -148,10 +170,9 @@ class ManualRecord(Action):
user_input = input()
tl, br = elem_list[int(user_input) - 1].bbox
x, y = (tl[0] + br[0]) // 2, (tl[1] + br[1]) // 2
ret = env.step(EnvAPIAbstract("user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir}))
ret = env.step(EnvAPIAbstract(api_name="user_swipe", kwargs={"x": x, "y": y, "orient": swipe_dir}))
if ret == ADB_EXEC_FAIL:
logger.info("ERROR: swipe execution failed", "red")
break
return AndroidActionOutput(action_state=RunState.FAIL)
record_file.write(f"swipe({int(user_input)}:sep:{swipe_dir}):::{elem_list[int(user_input) - 1].uid}\n")
elif user_input.lower() == ActionOp.STOP.value:
record_file.write("stop\n")
@ -160,3 +181,11 @@ class ManualRecord(Action):
else:
break
time.sleep(3)
# TODO
# 1. 截圖信息显示 KO
# 2. 不同功能测试 OK
# 3. demo 生成路径错误, 这个地方的结合需要考虑
# 1. Documentation Generate
# 2. Role Test

View file

@ -61,15 +61,12 @@ class SelfLearnAndReflect(Action):
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
print(resp)
resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
print(resp)
return resp
async def run_self_learn(
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
logger.info('run_self_learn')
screenshot_path: Path = env.observe(
EnvAPIAbstract(
api_name="get_screenshot", kwargs={"ss_name": f"{round_count}_before", "local_save_dir": task_dir}
@ -83,8 +80,6 @@ class SelfLearnAndReflect(Action):
clickable_list = []
focusable_list = []
# TODO Tuple Bug 从这里开始 Debug
# TODO Tuple Bug
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []

View file

@ -34,7 +34,17 @@ test_manual_parse = ParseRecord()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(
# loop.run_until_complete(
# test_manual_record.run(
# demo_name=DEMO_NAME,
# task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
# task_dir=TASK_PATH,
# env=test_env_manual_learn_android
# )
# )
test_action_list = [
test_self_learning.run(
round_count=20,
task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
@ -42,8 +52,14 @@ if __name__ == "__main__":
task_dir=TASK_PATH,
docs_dir=DOC_PATH,
env=test_env_self_learn_android
),
test_manual_record.run(
demo_name=DEMO_NAME,
task_dir=TASK_PATH,
task_desc="Create a contact in Contacts App named zjy with a phone number +86 18831933368 ",
env=test_env_manual_learn_android
)
)
]
# test_action_list = [
# test_self_learning.run(

View file

@ -101,8 +101,8 @@ class TextOp(BaseOpParam):
class LongPressOp(BaseOpParam):
area: int = Field(default=-1)
class SwipeOp(BaseOpParam):
# Modify This SwipeOp to SwipeOp_3, Need better name
class SwipeOp_3(BaseOpParam):
area: int = Field(default=-1)
swipe_orient: str = Field(default="up")
dist: str = Field(default="")

View file

@ -14,7 +14,7 @@ from metagpt.logs import logger
from examples.andriod_assistant.utils.schema import AndroidElement
from examples.andriod_assistant.utils.schema import BaseOpParam, BaseGridOpParam, GridOp, ActionOp, TapOp, TapGridOp, \
LongPressOp, LongPressGridOp, SwipeOp, SwipeGridOp, TextOp, RunState, ReflectOp, Decision
LongPressOp, LongPressGridOp, SwipeOp_3, SwipeGridOp, TextOp, RunState, ReflectOp, Decision
def get_id_from_element(elem: Element) -> str:
@ -217,7 +217,7 @@ def screenshot_parse_extract_without_grid(act_name: str, act: str, last_act: str
elif act_name == ActionOp.SWIPE.value:
params = re.findall(r"swipe\((.*?)\)", act)[0].split(",")
params = op_params_clean(params) # area, swipe_orient, dist
op = SwipeOp(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act)
op = SwipeOp_3(act_name=act_name, area=params[0], swipe_orient=params[1], dist=params[2], last_act=last_act)
elif act_name == ActionOp.GRID.value:
op = GridOp(act_name=act_name)
else: