update android_env to simplify code

This commit is contained in:
better629 2024-03-27 22:25:22 +08:00
parent 0b939f3078
commit a27b081ab3
18 changed files with 138 additions and 202 deletions

View file

@ -31,7 +31,7 @@ ### By Voice
## Run It
You can run Android Assisant by running the following command line:
```bash
python run_assistant.py "your task description" --stage "your choice(learn/act)" --mode "your choice(auto/manual)" --app-name "app name"
python run_assistant.py "your task description" --stage "your choice(learn or act)" --mode "your choice(auto or manual)" --app-name "app name"
```
And the specific parameters are as follows:
```text

View file

@ -13,7 +13,10 @@ from examples.andriod_assistant.utils.schema import (
RunState,
SwipeOp,
)
from examples.andriod_assistant.utils.utils import draw_bbox_multi, traverse_xml_tree
from examples.andriod_assistant.utils.utils import (
draw_bbox_multi,
elem_list_from_xml_tree,
)
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
@ -38,7 +41,6 @@ class ManualRecord(Action):
screenshot_after_path: Path = ""
xml_path: Path = ""
# async def run(self, demo_name: str, task_desc: str,task_dir: Path, env: AndroidEnv):
async def run(self, task_desc: str, task_dir: Path, env: AndroidEnv):
self.record_path = Path(task_dir) / "record.txt"
self.task_desc_path = Path(task_dir) / "task_desc.txt"
@ -50,11 +52,10 @@ class ManualRecord(Action):
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
with open(self.record_path, "w") as file:
file.write("")
self.record_path.write_text("")
record_file = open(self.record_path, "w")
with open(self.task_desc_path, "w") as f:
f.write(task_desc)
self.task_desc_path.write_text(task_desc)
step = 0
while True:
step += 1
@ -68,32 +69,10 @@ class ManualRecord(Action):
)
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
elem_list = elem_list_from_xml_tree(xml_path, self.useless_list, config.get_other("min_dist"))
screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{step}_labeled.png")
# screenshot_labeled_path = Path(self.screenshot_after_path).joinpath(f"{demo_name}_{step}_labeled.png")
labeled_img = draw_bbox_multi(screenshot_path, screenshot_labeled_path, elem_list)
cv2.imshow("image", labeled_img)

View file

@ -4,7 +4,6 @@
# LIKE scripts/document_generation.py
import ast
import json
import re
from pathlib import Path
@ -25,7 +24,6 @@ from examples.andriod_assistant.utils.schema import (
)
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.environment.android_env.android_env import AndroidEnv
from metagpt.logs import logger
from metagpt.utils.common import encode_image
@ -37,8 +35,7 @@ class ParseRecord(Action):
screenshot_before_path: Path = ""
screenshot_after_path: Path = ""
# async def run(self, app_name: str, demo_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
async def run(self, app_name: str, task_dir: Path, docs_dir: Path, env: AndroidEnv):
async def run(self, app_name: str, task_dir: Path, docs_dir: Path):
docs_dir.mkdir(parents=True, exist_ok=True)
doc_count = 0
self.record_path = Path(task_dir) / "record.txt"
@ -46,12 +43,12 @@ class ParseRecord(Action):
self.screenshot_before_path = Path(task_dir) / "raw_screenshots"
self.screenshot_after_path = Path(task_dir) / "labeled_screenshots"
task_desc = self.task_desc_path.read_text()
with open(self.record_path, "r") as record_file:
record_step_count = len(record_file.readlines()) - 1
record_file.seek(0)
for step in range(1, record_step_count + 1):
# img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step}_labeled.png"))
# img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{demo_name}_{step + 1}_labeled.png"))
img_before_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step}_labeled.png"))
img_after_base64 = encode_image(self.screenshot_after_path.joinpath(f"{step + 1}_labeled.png"))
rec = record_file.readline().strip()
@ -79,15 +76,18 @@ class ParseRecord(Action):
context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area)
else:
break
task_desc_path = task_dir.joinpath("task_desc.txt")
task_desc = open(task_desc_path, "r").read()
context = context.format(task_desc=task_desc)
doc_name = resource_id + ".txt"
doc_path = docs_dir.joinpath(doc_name)
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
try:
doc_content = ast.literal_eval(doc_path.read_text())
except Exception as exp:
logger.error(f"ast parse doc: {doc_path} failed, exp: {exp}")
continue
if doc_content[action_type]:
if config.get_other("doc_refine"):
refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type])
@ -111,7 +111,6 @@ class ParseRecord(Action):
)
if "error" in node.content:
return AndroidActionOutput(action_state=RunState.FAIL)
# log_path = task_dir.joinpath(f"log_{app_name}_{demo_name}.txt")
log_path = task_dir.joinpath(f"log_{app_name}.txt")
prompt = node.compile(context=context, schema="json", mode="auto")
msg = node.content
@ -125,17 +124,10 @@ class ParseRecord(Action):
image_after=img_after_base64,
response=node.content,
)
logfile.write(json.dumps(log_item.model_dump()) + "\n")
logfile.write(log_item.model_dump_json() + "\n")
with open(doc_path, "w") as outfile:
outfile.write(str(doc_content))
doc_count += 1
logger.info(f"Documentation generated and saved to {doc_path}")
# TODO MetaGPT 里面的Config 需要看一下
# time.sleep(config.get_other("request_interval"))
logger.info(f"Documentation generation phase completed. {doc_count} docs generated.")
# TODO
# 1. LOG中记录方式有问题需要把IMG的部分拿出去丢掉

View file

@ -42,6 +42,7 @@ from metagpt.environment.android_env.env_space import (
EnvObsParams,
EnvObsType,
)
from metagpt.logs import logger
from metagpt.utils.common import encode_image
@ -60,8 +61,13 @@ class ScreenshotParse(Action):
doc_path = docs_idr.joinpath(f"{elem.uid}.txt")
if not doc_path.exists():
continue
try:
doc_content = ast.literal_eval(doc_path.read_text())
except Exception as exp:
logger.error(f"ast parse doc: {doc_path} failed, exp: {exp}")
continue
ui_doc += f"Documentation of UI element labeled with the numeric tag '{i + 1}':\n"
doc_content = ast.literal_eval(open(doc_path, "r").read())
if doc_content["tap"]:
ui_doc += f"This UI element is clickable. {doc_content['tap']}\n\n"
if doc_content["text"]:

View file

@ -34,9 +34,9 @@ from examples.andriod_assistant.utils.schema import (
from examples.andriod_assistant.utils.utils import (
draw_bbox_multi,
elem_bbox_to_xy,
elem_list_from_xml_tree,
reflect_parse_extarct,
screenshot_parse_extract,
traverse_xml_tree,
)
from metagpt.actions.action import Action
from metagpt.config2 import config
@ -67,8 +67,7 @@ class SelfLearnAndReflect(Action):
self, round_count: int, task_desc: str, last_act: str, task_dir: Path, docs_dir: Path, env: AndroidEnv
) -> AndroidActionOutput:
for path in [task_dir, docs_dir]:
if not path.exists():
path.mkdir(parents=True, exist_ok=True)
path.mkdir(parents=True, exist_ok=True)
resp = await self.run_self_learn(round_count, task_desc, last_act, task_dir, env)
resp = await self.run_reflect(round_count, task_desc, last_act, task_dir, docs_dir, env)
return resp
@ -85,30 +84,8 @@ class SelfLearnAndReflect(Action):
if not screenshot_path.exists() or not xml_path.exists():
return AndroidActionOutput(action_state=RunState.FAIL)
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in self.useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in self.useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= config.get_other("min_dist"):
close = True
break
if not close:
elem_list.append(elem)
elem_list = elem_list_from_xml_tree(xml_path, self.useless_list, config.get_other("min_dist"))
screenshot_before_labeled_path = task_dir.joinpath(f"{round_count}_before_labeled.png")
draw_bbox_multi(screenshot_path, screenshot_before_labeled_path, elem_list)
img_base64 = encode_image(screenshot_before_labeled_path)
@ -210,8 +187,13 @@ class SelfLearnAndReflect(Action):
return AndroidActionOutput(action_state=RunState.FINISH)
if op_param.param_state == RunState.FAIL:
return AndroidActionOutput(action_state=RunState.FAIL)
# TODO 这里经常出现错误
logger.info(f"Error 高发地区, 长度为{len(self.elem_list)}ui_erea为{self.ui_area}")
logger.info(
f"reflect_parse_extarct decision: {op_param.decision}, "
f"elem_list size: {len(self.elem_list)}, ui_area: {self.ui_area}"
)
# TODO here will cause `IndexError: list index out of range`.
# Maybe you should clink back to the desktop in the simulator
resource_id = self.elem_list[int(self.ui_area) - 1].uid
if op_param.decision == Decision.INEFFECTIVE.value:
self.useless_list.append(resource_id)
@ -228,7 +210,12 @@ class SelfLearnAndReflect(Action):
doc = op_param.documentation
doc_path = docs_dir.joinpath(f"{resource_id}.txt")
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
try:
doc_content = ast.literal_eval(doc_path.read_text())
except Exception as exp:
logger.error(f"ast parse doc: {doc_path} failed, exp: {exp}")
return AndroidActionOutput(action_state=RunState.FAIL)
if doc_content[self.act_name]:
logger.info(f"Documentation for the element {resource_id} already exists.")
return AndroidActionOutput(action_state=RunState.FAIL)
@ -237,6 +224,3 @@ class SelfLearnAndReflect(Action):
setattr(doc_content, self.act_name, doc)
doc_path.write_text(str(doc_content))
return AndroidActionOutput(data={"last_act": last_act})
# TODO 如何处理 FINISH 状态这一点应该需要与role 联动才能解决

View file

@ -80,6 +80,14 @@ class AndroidAssistant(Role):
logger.debug(f"react result {result}")
return result
async def _observe(self, ignore_memory=True) -> int:
"""ignore old memory to make it run multi rounds inside a role"""
newest_msg = self.rc.memory.get(k=1)[0]
if RunState.SUCCESS not in newest_msg.content:
ignore_memory = False
logger.error("Latest action_state is FINISH or FAIL, won't react in remainder rounds", "red")
return await super()._observe(ignore_memory)
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
@ -90,7 +98,6 @@ class AndroidAssistant(Role):
app_name=config.get_other("app_name", "demo"),
task_dir=self.task_dir,
docs_dir=self.docs_dir,
env=self.rc.env,
)
elif isinstance(todo, SelfLearnAndReflect):
resp = await todo.run(
@ -117,12 +124,12 @@ class AndroidAssistant(Role):
logger.info(f"grid_on: {resp.data.get('grid_on')}")
self.grid_on = resp.data.get("grid_on")
msg = Message(
content=f"RoundCount: {self.round_count}",
content=f"RoundCount: {self.round_count}, action_state: {resp.action_state}",
role=self.profile,
cause_by=type(resp),
send_from=self.name,
send_to=self.name,
)
# self.publish_message(msg)
self.rc.memory.add(msg)
return msg

View file

@ -1,106 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : test case (imgs from appagent's)
import ast
import asyncio
import re
from examples.andriod_assistant.actions.parse_record_an import RECORD_PARSE_NODE
from examples.andriod_assistant.prompts.operation_prompt import (
long_press_doc_template,
refine_doc_suffix,
swipe_doc_template,
tap_doc_template,
text_doc_template,
)
from examples.andriod_assistant.utils.const import ROOT_PATH
from examples.andriod_assistant.utils.schema import ActionOp, SwipeOp
from metagpt.actions.action import Action
from metagpt.config2 import config
from metagpt.logs import logger
from metagpt.utils.common import encode_image
TASK_PATH = ROOT_PATH.parent.joinpath("data/demo_Contacts")
TEST_BEFORE_PATH = TASK_PATH.joinpath("labeled_screenshots/demo_Contacts_2024-01-24_12-07-55_3.png")
TEST_AFTER_PATH = TASK_PATH.joinpath("labeled_screenshots/demo_Contacts_2024-01-24_12-07-55_4.png")
RECORD_PATH = TASK_PATH.joinpath("record.txt")
TASK_DESC_PATH = TASK_PATH.joinpath("task_desc.txt")
DOCS_DIR = TASK_PATH.joinpath("storage")
test_action = Action(name="test")
async def manual_test():
img_before_base64 = encode_image(TEST_BEFORE_PATH)
img_after_base64 = encode_image(TEST_AFTER_PATH)
with open(RECORD_PATH, "r") as record_file:
rec = record_file.readline().strip()
action, resource_id = rec.split(":::")
action_type = action.split("(")[0]
action_param = re.findall(r"\((.*?)\)", action)[0]
if action_type == ActionOp.TAP.value:
prompt_template = tap_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.TEXT.value:
input_area, input_text = action_param.split(":sep:")
prompt_template = text_doc_template
context = prompt_template.format(ui_element=input_area)
elif action_type == ActionOp.LONG_PRESS.value:
prompt_template = long_press_doc_template
context = prompt_template.format(ui_element=action_param)
elif action_type == ActionOp.SWIPE.value:
swipe_area, swipe_dir = action_param.split(":sep:")
if swipe_dir == SwipeOp.UP.value or swipe_dir == SwipeOp.DOWN.value:
action_type = ActionOp.VERTICAL_SWIPE.value
elif swipe_dir == SwipeOp.LEFT.value or swipe_dir == SwipeOp.RIGHT.value:
action_type = ActionOp.HORIZONTAL_SWIPE.value
prompt_template = swipe_doc_template
context = prompt_template.format(swipe_dir=swipe_dir, ui_element=swipe_area)
else:
logger.error("Error occurs")
task_desc_path = TASK_DESC_PATH
task_desc = open(task_desc_path, "r").read()
context = context.format(task_desc=task_desc)
doc_name = resource_id + ".txt"
doc_path = DOCS_DIR.joinpath(doc_name)
if doc_path.exists():
doc_content = ast.literal_eval(open(doc_path).read())
if doc_content[action_type]:
if config.get_other("doc_refine"):
refine_context = refine_doc_suffix.format(old_doc=doc_content[action_type])
context += refine_context
logger.info(
f"Documentation for the element {resource_id} already exists. The doc will be "
f"refined based on the latest demo."
)
else:
logger.info(
f"Documentation for the element {resource_id} already exists. Turn on DOC_REFINE "
f"in the config file if needed."
)
else:
doc_content = {"tap": "", "text": "", "v_swipe": "", "h_swipe": "", "long_press": ""}
logger.info(f"Waiting for GPT-4V to generate documentation for the element {resource_id}")
node = await RECORD_PARSE_NODE.fill(
context=context, llm=test_action.llm, images=[img_before_base64, img_after_base64]
)
node.compile(context=context, schema="json", mode="auto")
msg = node.content
doc_content[action_type] = msg
with open(doc_path, "w") as outfile:
outfile.write(str(doc_content))
logger.info(f"Documentation generated and saved to {doc_path}")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(manual_test())
loop.close()

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Desc : test case (imgs from appagent's)
import asyncio
from examples.andriod_assistant.actions.parse_record import ParseRecord
from examples.andriod_assistant.utils.const import ROOT_PATH
from metagpt.actions.action import Action
TASK_PATH = ROOT_PATH.parent.joinpath("data/demo_Contacts")
TEST_BEFORE_PATH = TASK_PATH.joinpath("labeled_screenshots/0_labeled.png")
TEST_AFTER_PATH = TASK_PATH.joinpath("labeled_screenshots/1_labeled.png")
RECORD_PATH = TASK_PATH.joinpath("record.txt")
TASK_DESC_PATH = TASK_PATH.joinpath("task_desc.txt")
DOCS_DIR = TASK_PATH.joinpath("storage")
test_action = Action(name="test")
async def manual_learn_test():
parse_record = ParseRecord()
await parse_record.run(app_name="demo_Contacts", task_dir=TASK_PATH, docs_dir=DOCS_DIR)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(manual_learn_test())
loop.close()

View file

@ -80,6 +80,34 @@ def traverse_xml_tree(xml_path: Path, elem_list: list[AndroidElement], attrib: s
path.pop()
def elem_list_from_xml_tree(xml_path: Path, useless_list: list[str], min_dist: int) -> list[AndroidElement]:
clickable_list = []
focusable_list = []
traverse_xml_tree(xml_path, clickable_list, "clickable", True)
traverse_xml_tree(xml_path, focusable_list, "focusable", True)
elem_list = []
for elem in clickable_list:
if elem.uid in useless_list:
continue
elem_list.append(elem)
for elem in focusable_list:
if elem.uid in useless_list:
continue
bbox = elem.bbox
center = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
close = False
for e in clickable_list:
bbox = e.bbox
center_ = (bbox[0][0] + bbox[1][0]) // 2, (bbox[0][1] + bbox[1][1]) // 2
dist = (abs(center[0] - center_[0]) ** 2 + abs(center[1] - center_[1]) ** 2) ** 0.5
if dist <= min_dist:
close = True
break
if not close:
elem_list.append(elem)
return elem_list
def draw_bbox_multi(
img_path: Path,
output_path: Path,

View file

@ -0,0 +1 @@
!*.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 611 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 840 KiB

View file

@ -0,0 +1,2 @@
tap(9):::android.view.ViewGroup_1067_236_android.widget.TextView_183_204_Apps_2
stop

View file

@ -0,0 +1 @@
Create a contact in Contacts App named zjy with a phone number +86 18831933368