diff --git a/examples/di/crawl_webpage.py b/examples/di/crawl_webpage.py index 92e3c32b0..c4e1b6599 100644 --- a/examples/di/crawl_webpage.py +++ b/examples/di/crawl_webpage.py @@ -6,9 +6,7 @@ """ from metagpt.roles.di.data_interpreter import DataInterpreter - -__import__("metagpt.tools.libs.browser", fromlist=["Browser"]) # To skip pre-commit check - +from metagpt.tools.libs.web_scraping import view_page_element_to_scrape PAPER_LIST_REQ = """" Get data from `paperlist` table in https://papercopilot.com/statistics/iclr-statistics/iclr-2024-statistics/, @@ -34,7 +32,7 @@ NEWS_36KR_REQ = """从36kr创投平台https://pitchhub.36kr.com/financing-flash async def main(): - di = DataInterpreter(tools=["Browser"]) + di = DataInterpreter(tools=[view_page_element_to_scrape.__name__]) await di.run(ECOMMERCE_REQ) diff --git a/metagpt/configs/llm_config.py b/metagpt/configs/llm_config.py index af8f56372..39f6e61f1 100644 --- a/metagpt/configs/llm_config.py +++ b/metagpt/configs/llm_config.py @@ -31,6 +31,7 @@ class LLMType(Enum): MOONSHOT = "moonshot" MISTRAL = "mistral" YI = "yi" # lingyiwanwu + OPEN_ROUTER = "open_router" def __missing__(self, key): return self.OPENAI diff --git a/metagpt/prompts/di/role_zero.py b/metagpt/prompts/di/role_zero.py index 4d52476aa..d8dd2cb60 100644 --- a/metagpt/prompts/di/role_zero.py +++ b/metagpt/prompts/di/role_zero.py @@ -50,3 +50,14 @@ Some text indicating your thoughts, such as how you should update the plan statu ] ``` """ + +JSON_REPAIR_PROMPT = """ +## json data +{json_data} + +## Output Format +```json +Formatted JSON data +``` +Help check if there are any formatting issues with the JSON data? If so, please help format it +""" diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 120c1d3cb..0263da989 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -40,7 +40,8 @@ from metagpt.utils.token_counter import ( ) -@register_provider([LLMType.OPENAI, LLMType.FIREWORKS, LLMType.OPEN_LLM, LLMType.MOONSHOT, LLMType.MISTRAL, LLMType.YI]) +@register_provider([LLMType.OPENAI, LLMType.FIREWORKS, LLMType.OPEN_LLM, LLMType.MOONSHOT, LLMType.MISTRAL, LLMType.YI, + LLMType.OPEN_ROUTER]) class OpenAILLM(BaseLLM): """Check https://platform.openai.com/examples for examples""" diff --git a/metagpt/rag/engines/simple.py b/metagpt/rag/engines/simple.py index c237dcf69..8a9ccaffd 100644 --- a/metagpt/rag/engines/simple.py +++ b/metagpt/rag/engines/simple.py @@ -4,6 +4,7 @@ import json import os from typing import Any, Optional, Union +import fsspec from llama_index.core import SimpleDirectoryReader from llama_index.core.callbacks.base import CallbackManager from llama_index.core.embeddings import BaseEmbedding @@ -83,6 +84,7 @@ class SimpleEngine(RetrieverQueryEngine): llm: LLM = None, retriever_configs: list[BaseRetrieverConfig] = None, ranker_configs: list[BaseRankerConfig] = None, + fs: Optional[fsspec.AbstractFileSystem] = None, ) -> "SimpleEngine": """From docs. @@ -96,11 +98,12 @@ class SimpleEngine(RetrieverQueryEngine): llm: Must supported by llama index. Default OpenAI. retriever_configs: Configuration for retrievers. If more than one config, will use SimpleHybridRetriever. ranker_configs: Configuration for rankers. + fs: File system to use. """ if not input_dir and not input_files: raise ValueError("Must provide either `input_dir` or `input_files`.") - documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files).load_data() + documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files, fs=fs).load_data() cls._fix_document_metadata(documents) transformations = transformations or cls._default_transformations() diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index b5342409f..906c5583c 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -2,6 +2,7 @@ from __future__ import annotations import inspect import json +import re import traceback from typing import Callable, Literal, Tuple @@ -10,7 +11,7 @@ from pydantic import model_validator from metagpt.actions import Action from metagpt.actions.di.run_command import RunCommand from metagpt.logs import logger -from metagpt.prompts.di.role_zero import CMD_PROMPT, ROLE_INSTRUCTION +from metagpt.prompts.di.role_zero import CMD_PROMPT, ROLE_INSTRUCTION, JSON_REPAIR_PROMPT from metagpt.roles import Role from metagpt.schema import AIMessage, Message, UserMessage from metagpt.strategy.experience_retriever import DummyExpRetriever, ExpRetriever @@ -21,6 +22,7 @@ from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender from metagpt.tools.tool_registry import register_tool from metagpt.utils.common import CodeParser from metagpt.utils.report import ThoughtReporter +from metagpt.utils.repair_llm_raw_output import repair_llm_raw_output, RepairType @register_tool(include_functions=["ask_human", "reply_to_human"]) @@ -87,6 +89,23 @@ class RoleZero(Role): "RoleZero.ask_human": self.ask_human, "RoleZero.reply_to_human": self.reply_to_human, } + self.tool_execution_map.update( + { + f"Browser.{i}": getattr(self.browser, i) + for i in [ + "click", + "close_tab", + "go_back", + "go_forward", + "goto", + "hover", + "press", + "scroll", + "tab_focus", + "type", + ] + } + ) # can be updated by subclass self._update_tool_execution() return self @@ -125,7 +144,14 @@ class RoleZero(Role): available_commands=tool_info, instruction=self.instruction.strip(), ) - context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)]) + memory = self.rc.memory.get(self.memory_k) + if not self.browser.is_empty_page: + pattern = re.compile(r"Command Browser\.(\w+) executed") + for index, msg in zip(range(len(memory), 0, -1), memory[::-1]): + if pattern.match(msg.content): + memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view())) + break + context = self.llm.format_msg(memory + [UserMessage(content=prompt)]) # print(*context, sep="\n" + "*" * 5 + "\n") async with ThoughtReporter(enable_llm_stream=True): self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg) @@ -138,13 +164,22 @@ class RoleZero(Role): return await super()._act() try: - commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp)) + commands = CodeParser.parse_code(block=None, lang="json", text=self.command_rsp) + commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON)) + except json.JSONDecodeError as e: + commands = await self.llm.aask(msg=JSON_REPAIR_PROMPT.format(json_data=self.command_rsp)) + commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands)) except Exception as e: tb = traceback.format_exc() print(tb) error_msg = UserMessage(content=str(e)) self.rc.memory.add(error_msg) return error_msg + + # 为了对LLM不按格式生成进行容错 + if isinstance(commands, dict): + commands = commands["commands"] if "commands" in commands else [commands] + outputs = await self._run_commands(commands) self.rc.memory.add(UserMessage(content=outputs)) return AIMessage( diff --git a/metagpt/tools/libs/browser.py b/metagpt/tools/libs/browser.py index 1b1b3d82d..c6ea71bd5 100644 --- a/metagpt/tools/libs/browser.py +++ b/metagpt/tools/libs/browser.py @@ -1,261 +1,206 @@ from __future__ import annotations -import contextlib -from uuid import uuid4 +import time +from typing import Literal, Optional -from playwright.async_api import async_playwright +from playwright.async_api import Browser as Browser_ +from playwright.async_api import ( + BrowserContext, + Frame, + Page, + Playwright, + Request, + async_playwright, +) -from metagpt.const import DEFAULT_WORKSPACE_ROOT from metagpt.tools.tool_registry import register_tool -from metagpt.utils.file import MemoryFileSystem -from metagpt.utils.parse_html import simplify_html +from metagpt.utils.a11y_tree import ( + click_element, + get_accessibility_tree, + get_backend_node_id, + hover_element, + key_press, + parse_accessibility_tree, + scroll_page, + type_text, +) from metagpt.utils.report import BrowserReporter -@register_tool(tags=["web", "browse", "scrape"]) +@register_tool( + tags=["web", "browse"], + include_functions=[ + "click", + "close_tab", + "go_back", + "go_forward", + "goto", + "hover", + "press", + "scroll", + "tab_focus", + "type", + ], +) class Browser: - """ - A tool for browsing the web and scraping. Don't initialize a new instance of this class if one already exists. - Note: Combine searching and scrolling together to achieve most effective browsing. DON'T stick to one method. + """A tool for browsing the web. Don't initialize a new instance of this class if one already exists. + + Note: If you plan to use the browser to assist you in completing tasks, then using the browser should be a standalone + task, executing actions each time based on the content seen on the webpage before proceeding to the next step. + + ## Example + Issue: The details of the latest issue in the geekan/MetaGPT repository. + Plan: Use a browser to view the details of the latest issue in the geekan/MetaGPT repository. + Solution: + Let's first open the issue page of the MetaGPT repository with the `Browser.goto` command + + >>> await browser.goto("https://github.com/geekan/MetaGPT/issues") + + From the output webpage, we've identified that the latest issue can be accessed by clicking on the element with ID "1141". + + >>> await browser.click(1141) + + Finally, we have found the webpage for the latest issue, we can close the tab and finish current task. + + >>> await browser.close_tab() """ def __init__(self): - """initiate the browser, create pages placeholder later to be managed as {page_url: page object}""" - self.browser = None - - # browser status management - self.pages = {} - self.current_page_url = None - self.current_page = None + self.playwright: Optional[Playwright] = None + self.browser_instance: Optional[Browser_] = None + self.browser_ctx: Optional[BrowserContext] = None + self.page: Optional[Page] = None + self.accessibility_tree: list = [] + self.headless: bool = True + self.proxy = None + self.is_empty_page = True self.reporter = BrowserReporter() - async def start(self): + async def start(self) -> None: """Starts Playwright and launches a browser""" - self.playwright = await async_playwright().start() - self.browser = await self.playwright.chromium.launch() + if self.playwright is None: + self.playwright = playwright = await async_playwright().start() + browser = self.browser_instance = await playwright.chromium.launch(headless=self.headless, proxy=self.proxy) + browser_ctx = self.browser_ctx = await browser.new_context() + self.page = await browser_ctx.new_page() - async def _set_current_page(self, page, url): - self.current_page = page - self.current_page_url = url - print("Now on page ", url) - await self._view() + async def stop(self): + if self.playwright: + playwright = self.playwright + self.playwright = None + self.browser_instance = None + self.browser_ctx = None + await playwright.stop() - async def open_new_page(self, url: str, timeout: float = 30000): - """open a new page in the browser and view the page""" + async def click(self, element_id: int): + """clicks on an element with a specific id on the webpage.""" + await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree)) + return await self._wait_page() + + async def type(self, element_id: int, content: str, press_enter_after: bool = False): + """Use this to type the content into the field with id.""" + if press_enter_after: + content += "\n" + await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree)) + await type_text(self.page, content) + return await self._wait_page() + + async def hover(self, element_id: int): + """Hover over an element with id.""" + await hover_element(self.page, get_backend_node_id(element_id, self.accessibility_tree)) + return await self._wait_page() + + async def press(self, key_comb: str): + """Simulates the pressing of a key combination on the keyboard (e.g., Ctrl+v).""" + await key_press(self.page, key_comb) + return await self._wait_page() + + async def scroll(self, direction: Literal["down", "up"]): + """Scroll the page up or down.""" + await scroll_page(self.page, direction) + return await self._wait_page() + + async def goto(self, url: str, timeout: float = 30000): + """Navigate to a specific URL.""" async with self.reporter as reporter: - page = await self.browser.new_page() await reporter.async_report(url, "url") - await page.goto(url, timeout=timeout) - self.pages[url] = page - await self._set_current_page(page, url) - await reporter.async_report(page, "page") + await self.page.goto(url, timeout=timeout) + self.is_empty_page = False + return await self._wait_page() - async def view_page_element_to_scrape(self, requirement: str, keep_links: bool = False) -> None: - """view the HTML content of current page to understand the structure. When executed, the content will be printed out + async def go_back(self): + """Navigate to the previously viewed page.""" + await self.page.go_back() + return await self._wait_page() - Args: - requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements. - keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required - """ - html = await self.current_page.content() - html = simplify_html(html, url=self.current_page.url, keep_links=keep_links) - mem_fs = MemoryFileSystem() - filename = f"{uuid4().hex}.html" - with mem_fs.open(filename, "w") as f: - f.write(html) + async def go_forward(self): + """Navigate to the next page (if a previous 'go_back' action was performed).""" + await self.page.go_forward() + return await self._wait_page() - # Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback. - with contextlib.suppress(Exception): - from metagpt.rag.engines import SimpleEngine # avoid circular import + async def tab_focus(self, page_number: int): + """Open a new, empty browser tab.""" + page = self.browser_ctx.pages[page_number] + await page.bring_to_front() + return await self._wait_page() - # TODO make `from_docs` asynchronous - engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs) - nodes = await engine.aretrieve(requirement) - html = "\n".join(i.text for i in nodes) - - mem_fs.rm_file(filename) - print(html) - - async def get_page_content(self) -> str: - """Get the HTML content of current page.""" - html = await self.current_page.content() - html_content = html.strip() - return html_content - - async def switch_page(self, url: str): - """switch to an opened page in the browser and view the page""" - if url in self.pages: - await self._set_current_page(self.pages[url], url) - await self.reporter.async_report(self.current_page, "page") + async def close_tab(self): + """Close the currently active tab.""" + await self.page.close() + if len(self.browser_ctx.pages) > 0: + self.page = self.browser_ctx.pages[-1] else: - print(f"Page not found: {url}") + self.page = await self.browser_ctx.new_page() + self.is_empty_page = True + return await self._wait_page() - async def _view_page_html(self, keep_len: int = 5000) -> str: - """view the HTML content of current page, return the HTML content as a string. When executed, the content will be printed out""" - html = await self.current_page.content() - html_content = html.strip()[:keep_len] - return html_content + async def _wait_page(self): + page = self.page + await self._wait_until_page_idle(page) + self.accessibility_tree = await get_accessibility_tree(page) + await self.reporter.async_report(page, "page") + return f"SUCCESS, URL: {page.url}" - async def search_content_all(self, search_term: str) -> list[dict]: - """search all occurences of search term in the current page and return the search results with their position. - Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it. + def _register_page_event(self, page: Page): + page.last_busy_time = time.time() + page.requests = set() + page.on("domcontentloaded", self._update_page_last_busy_time) + page.on("load", self._update_page_last_busy_time) + page.on("request", self._on_page_request) + page.on("requestfailed", self._on_page_requestfinished) + page.on("requestfinished", self._on_page_requestfinished) + page.on("frameattached", self._on_frame_change) + page.on("framenavigated", self._on_frame_change) - Args: - search_term (str): the search term + async def _wait_until_page_idle(self, page) -> None: + if not hasattr(page, "last_busy_time"): + self._register_page_event(page) + else: + page.last_busy_time = time.time() + while time.time() - page.last_busy_time < 0.5: + await page.wait_for_timeout(100) - Returns: - list[dict]: a list of dictionaries containing the elements and their positions, e.g. - [ - { - "index": ..., - "content": { - "text_block": ..., - "links": [ - {"text": ..., "href": ...}, - ... - ] - }, - "position": {from_top: ..., from_left: ...}, - }, - ... - ] - """ - locator = self.current_page.locator(f"text={search_term}") - count = await locator.count() - search_results = [] - for i in range(count): - element = locator.nth(i) - if await element.is_visible(): - position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })") + async def _update_page_last_busy_time(self, page: Page): + page.last_busy_time = time.time() - # Retrieve the surrounding block of text and links with their text - content = await element.evaluate(SEARCH_CONTENT_JS) + async def _on_page_request(self, request: Request): + page = request.frame.page + page.requests.add(request) + await self._update_page_last_busy_time(page) - search_results.append( - {"index": len(search_results), "content": content, "position": position, "element_obj": element} - ) + async def _on_page_requestfinished(self, request: Request): + request.frame.page.requests.discard(request) - print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}") + async def _on_frame_change(self, frame: Frame): + await self._update_page_last_busy_time(frame.page) - return search_results + async def view(self): + observation = parse_accessibility_tree(self.accessibility_tree) + return f"Current Browser Viewer\n URL: {self.page.url}\nOBSERVATION:\n{observation[0]}\n" - async def scroll_to_search_result(self, search_results: list[dict], index: int = 0): - """Scroll to the index-th search result, potentially for subsequent perception. - Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content. + async def __aenter__(self): + await self.start() + return self - Args: - search_results (list[dict]): search_results from search_content_all - index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0. - """ - if not search_results: - return {} - if index >= len(search_results): - print(f"Index {index} is out of range. Scrolling to the last instance.") - index = len(search_results) - 1 - element = search_results[index]["element_obj"] - await element.scroll_into_view_if_needed() - await self.reporter.async_report(self.current_page, "page") - - print(f"Successfully scrolled to the {index}-th search result") - print(await self._view()) - - # async def find_links(self) -> list: - # """Finds all links in the current page and returns a list of dictionaries with link text and the URL. - # Useful for navigating to more pages and exploring more resources. - - # Returns: - # list: A list of dictionaries, each containing 'text' and 'href' keys. - # """ - # # Use a CSS selector to find all elements in the page. - # links = await self.current_page.query_selector_all("a") - - # # Prepare an empty list to hold link information. - # link_info = [] - - # # Iterate over each link element to extract its text and href attributes. - # for link in links: - # text = await link.text_content() - # href = await link.get_attribute("href") - # link_info.append({"text": text, "href": href}) - - # print(f"Found {len(link_info)} links:\n\n{link_info}") - - # return link_info - - async def screenshot(self, path: str = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png"): - """Take a screenshot of the current page and save it to the specified path.""" - await self.current_page.screenshot(path=path) - print(f"Screenshot saved to: {path}") - - async def _view(self, keep_len: int = 5000) -> str: - """simulate human viewing the current page, return the visible text with links""" - visible_text_with_links = await self.current_page.evaluate(VIEW_CONTENT_JS) - print("The visible text and their links (if any): ", visible_text_with_links[:keep_len]) - # html_content = await self._view_page_html(keep_len=keep_len) - # print("The html content: ", html_content) - - async def scroll_current_page(self, offset: int = 500): - """scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling""" - await self.current_page.evaluate(f"window.scrollBy(0, {offset})") - await self.reporter.async_report(self.current_page, "page") - - print(f"Scrolled current page by {offset} pixels.") - print(await self._view()) - - def check_all_pages(self) -> dict: - """return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state""" - pages_info = {url: page.title() for url, page in self.pages.items()} - return pages_info - - async def close(self): - """close the browser and all pages""" - await self.browser.close() - await self.playwright.stop() - - -async def get_scroll_position(page): - return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })") - - -SEARCH_CONTENT_JS = """ -(element) => { - // const block = element.closest('p, div, section, article'); - const block = element.parentElement; - return { - text_block: block.innerText, - // Create an array of objects, each containing the text and href of a link - links: Array.from(block.querySelectorAll('a')).map(a => ({ - text: a.innerText, - href: a.href - })) - }; -} -""" - - -VIEW_CONTENT_JS = """ -() => { - return Array.from(document.querySelectorAll('body *')).filter(el => { - if (!(el.offsetWidth || el.offsetHeight || el.getClientRects().length)) return false; - const style = window.getComputedStyle(el); - if (style.display === 'none' || style.visibility !== 'visible' || style.opacity === '0') return false; - const rect = el.getBoundingClientRect(); - const elemCenter = { - x: rect.left + rect.width / 2, - y: rect.top + rect.height / 2 - }; - if (elemCenter.x < 0 || elemCenter.y < 0 || elemCenter.x > window.innerWidth || elemCenter.y > window.innerHeight) return false; - if (document.elementFromPoint(elemCenter.x, elemCenter.y) !== el) return false; - return true; - }).map(el => { - let text = el.innerText || ''; - text = text.trim(); - if (!text.length) return ''; - const parentAnchor = el.closest('a'); - if (parentAnchor && parentAnchor.href) { - return `${text} (${parentAnchor.href})`; - } - return text; - }).filter(text => text.length > 0).join("\\n"); -} -""" + async def __aexit__(self, *args, **kwargs): + await self.stop() diff --git a/metagpt/tools/libs/web_scraping.py b/metagpt/tools/libs/web_scraping.py index bc34b1306..489c3a472 100644 --- a/metagpt/tools/libs/web_scraping.py +++ b/metagpt/tools/libs/web_scraping.py @@ -1,20 +1,50 @@ +import contextlib +from uuid import uuid4 + +from metagpt.tools.libs.browser import Browser from metagpt.tools.tool_registry import register_tool -from metagpt.tools.web_browser_engine_playwright import PlaywrightWrapper +from metagpt.utils.file import MemoryFileSystem +from metagpt.utils.parse_html import simplify_html -@register_tool(tags=["web scraping", "web"]) -async def scrape_web_playwright(url): - """ - Asynchronously Scrape and save the HTML structure and inner text content of a web page using Playwright. +@register_tool(tags=["web scraping"]) +async def view_page_element_to_scrape(url: str, requirement: str, keep_links: bool = False) -> None: + """view the HTML content of current page to understand the structure. When executed, the content will be printed out Args: - url (str): The main URL to fetch inner text from. - - Returns: - dict: The inner text content and html structure of the web page, keys are 'inner_text', 'html'. + url (str): The URL of the web page to scrape. + requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements. + keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required """ - # Create a PlaywrightWrapper instance for the Chromium browser - web = await PlaywrightWrapper().run(url) + async with Browser() as browser: + await browser.goto(url) + page = browser.page + html = await page.content() + html = simplify_html(html, url=page.url, keep_links=keep_links) + mem_fs = MemoryFileSystem() + filename = f"{uuid4().hex}.html" + with mem_fs.open(filename, "w") as f: + f.write(html) - # Return the inner text content of the web page - return {"inner_text": web.inner_text.strip(), "html": web.html.strip()} + # Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback. + with contextlib.suppress(Exception): + from metagpt.rag.engines import SimpleEngine # avoid circular import + + # TODO make `from_docs` asynchronous + engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs) + nodes = await engine.aretrieve(requirement) + html = "\n".join(i.text for i in nodes) + + mem_fs.rm_file(filename) + print(html) + + +# async def get_elements_outerhtml(self, element_ids: list[int]): +# """Inspect the outer HTML of the elements in Current Browser Viewer. +# """ +# page = self.page +# data = [] +# for element_id in element_ids: +# html = await get_element_outer_html(page, get_backend_node_id(element_id, self.accessibility_tree)) +# data.append(html) +# return "\n".join(f"[{element_id}]. {html}" for element_id, html in zip(element_ids, data)) diff --git a/metagpt/utils/a11y_tree.py b/metagpt/utils/a11y_tree.py new file mode 100644 index 000000000..59acbc6dc --- /dev/null +++ b/metagpt/utils/a11y_tree.py @@ -0,0 +1,306 @@ +"""See https://github.com/web-arena-x/webarena +""" +from __future__ import annotations + +import re + +from playwright.async_api import BrowserContext, Page + + +async def get_accessibility_tree(page: Page): + cdp_session = await get_page_cdp_session(page) + resp = await cdp_session.send("Accessibility.getFullAXTree") + + seen_ids = set() + accessibility_tree = [] + for node in resp["nodes"]: + if node["nodeId"] not in seen_ids: + accessibility_tree.append(node) + seen_ids.add(node["nodeId"]) + return accessibility_tree + + +async def execute_step(step: str, page: Page, browser_ctx: BrowserContext, accessibility_tree: list): + step = step.strip() + func = step.split("[")[0].strip() if "[" in step else step.split()[0].strip() + if func == "None": + return "" + elif func == "click": + match = re.search(r"click ?\[(\d+)\]", step) + if not match: + raise ValueError(f"Invalid click action {step}") + element_id = match.group(1) + await click_element(page, get_backend_node_id(element_id, accessibility_tree)) + elif func == "hover": + match = re.search(r"hover ?\[(\d+)\]", step) + if not match: + raise ValueError(f"Invalid hover action {step}") + element_id = match.group(1) + await hover_element(page, get_backend_node_id(element_id, accessibility_tree)) + elif func == "type": + # add default enter flag + if not (step.endswith("[0]") or step.endswith("[1]")): + step += " [1]" + + match = re.search(r"type ?\[(\d+)\] ?\[(.+)\] ?\[(\d+)\]", step) + if not match: + raise ValueError(f"Invalid type action {step}") + element_id, text, enter_flag = ( + match.group(1), + match.group(2), + match.group(3), + ) + if enter_flag == "1": + text += "\n" + await click_element(page, get_backend_node_id(element_id, accessibility_tree)) + await type_text(page, text) + elif func == "press": + match = re.search(r"press ?\[(.+)\]", step) + if not match: + raise ValueError(f"Invalid press action {step}") + key = match.group(1) + await key_press(page, key) + elif func == "scroll": + # up or down + match = re.search(r"scroll ?\[?(up|down)\]?", step) + if not match: + raise ValueError(f"Invalid scroll action {step}") + direction = match.group(1) + await scroll_page(page, direction) + elif func == "goto": + match = re.search(r"goto ?\[(.+)\]", step) + if not match: + raise ValueError(f"Invalid goto action {step}") + url = match.group(1) + await page.goto(url) + elif func == "new_tab": + page = await browser_ctx.new_page() + elif func == "go_back": + await page.go_back() + elif func == "go_forward": + await page.go_forward() + elif func == "tab_focus": + match = re.search(r"tab_focus ?\[(\d+)\]", step) + if not match: + raise ValueError(f"Invalid tab_focus action {step}") + page_number = int(match.group(1)) + page = browser_ctx.pages[page_number] + await page.bring_to_front() + elif func == "close_tab": + await page.close() + if len(browser_ctx.pages) > 0: + page = browser_ctx.pages[-1] + else: + page = await browser_ctx.new_page() + elif func == "stop": + match = re.search(r'stop\(?"(.+)?"\)', step) + answer = match.group(1) if match else "" + return answer + else: + raise ValueError + await page.wait_for_load_state("domcontentloaded") + return page + + +async def type_text(page: Page, text: str): + await page.keyboard.type(text) + + +async def click_element(page: Page, backend_node_id: int): + cdp_session = await get_page_cdp_session(page) + resp = await get_bounding_rect(cdp_session, backend_node_id) + node_info = resp["result"]["value"] + x, y = await get_element_center(node_info) + await page.mouse.click(x, y) + + +async def hover_element(page: Page, backend_node_id: int) -> None: + cdp_session = await get_page_cdp_session(page) + resp = await get_bounding_rect(cdp_session, backend_node_id) + node_info = resp["result"]["value"] + x, y = await get_element_center(node_info) + await page.mouse.move(x, y) + + +async def scroll_page(page: Page, direction: str) -> None: + # perform the action + # code from natbot + if direction == "up": + await page.evaluate( + "(document.scrollingElement || document.body).scrollTop = (document.scrollingElement || document.body).scrollTop - window.innerHeight;" + ) + elif direction == "down": + await page.evaluate( + "(document.scrollingElement || document.body).scrollTop = (document.scrollingElement || document.body).scrollTop + window.innerHeight;" + ) + + +async def key_press(page: Page, key: str) -> None: + """Press a key.""" + if "Meta" in key and "Mac" not in await page.evaluate("navigator.platform"): + key = key.replace("Meta", "Control") + await page.keyboard.press(key) + + +async def get_element_outer_html(page: Page, backend_node_id: int): + cdp_session = await get_page_cdp_session(page) + try: + outer_html = await cdp_session.send("DOM.getOuterHTML", {"backendNodeId": int(backend_node_id)}) + return outer_html["outerHTML"] + except Exception as e: + raise ValueError("Element not found") from e + + +async def get_element_center(node_info): + x, y, width, height = node_info["x"], node_info["y"], node_info["width"], node_info["height"] + center_x = x + width / 2 + center_y = y + height / 2 + return center_x, center_y + + +def extract_step(response: str, action_splitter: str = "```") -> str: + # find the first occurence of action + pattern = rf"{action_splitter}((.|\n)*?){action_splitter}" + match = re.search(pattern, response) + if match: + return match.group(1).strip() + else: + raise ValueError(f'Cannot find the answer phrase "{response}"') + + +async def get_bounding_rect(cdp_session, backend_node_id: str): + try: + remote_object = await cdp_session.send("DOM.resolveNode", {"backendNodeId": int(backend_node_id)}) + remote_object_id = remote_object["object"]["objectId"] + response = await cdp_session.send( + "Runtime.callFunctionOn", + { + "objectId": remote_object_id, + "functionDeclaration": """ + function() { + if (this.nodeType == 3) { + var range = document.createRange(); + range.selectNode(this); + var rect = range.getBoundingClientRect().toJSON(); + range.detach(); + return rect; + } else { + return this.getBoundingClientRect().toJSON(); + } + } + """, + "returnByValue": True, + }, + ) + return response + except Exception as e: + raise ValueError("Element not found") from e + + +IGNORED_ACTREE_PROPERTIES = ( + "focusable", + "editable", + "readonly", + "level", + "settable", + "multiline", + "invalid", +) + + +def parse_accessibility_tree(accessibility_tree): + """Parse the accessibility tree into a string text""" + node_id_to_idx = {} + for idx, node in enumerate(accessibility_tree): + node_id_to_idx[node["nodeId"]] = idx + + obs_nodes_info = {} + + def dfs(idx: int, obs_node_id: str, depth: int) -> str: + tree_str = "" + node = accessibility_tree[idx] + indent = "\t" * depth + valid_node = True + try: + role = node["role"]["value"] + name = node["name"]["value"] + node_str = f"[{obs_node_id}] {role} {repr(name)}" + properties = [] + for property in node.get("properties", []): + try: + if property["name"] in IGNORED_ACTREE_PROPERTIES: + continue + properties.append(f'{property["name"]}: {property["value"]["value"]}') + except KeyError: + pass + + if properties: + node_str += " " + " ".join(properties) + + # check valid + if not node_str.strip(): + valid_node = False + + # empty generic node + if not name.strip(): + if not properties: + if role in [ + "generic", + "img", + "list", + "strong", + "paragraph", + "banner", + "navigation", + "Section", + "LabelText", + "Legend", + "listitem", + ]: + valid_node = False + elif role in ["listitem"]: + valid_node = False + + if valid_node: + tree_str += f"{indent}{node_str}" + obs_nodes_info[obs_node_id] = { + "backend_id": node["backendDOMNodeId"], + "union_bound": node["union_bound"], + "text": node_str, + } + + except Exception: + valid_node = False + + for _, child_node_id in enumerate(node["childIds"]): + if child_node_id not in node_id_to_idx: + continue + # mark this to save some tokens + child_depth = depth + 1 if valid_node else depth + child_str = dfs(node_id_to_idx[child_node_id], child_node_id, child_depth) + if child_str.strip(): + if tree_str.strip(): + tree_str += "\n" + tree_str += child_str + + return tree_str + + tree_str = dfs(0, accessibility_tree[0]["nodeId"], 0) + return tree_str, obs_nodes_info + + +async def get_page_cdp_session(page): + if hasattr(page, "cdp_session"): + return page.cdp_session + + cdp_session = await page.context.new_cdp_session(page) + page.cdp_session = cdp_session + return cdp_session + + +def get_backend_node_id(element_id, accessibility_tree): + element_id = str(element_id) + for i in accessibility_tree: + if i["nodeId"] == element_id: + return i.get("backendDOMNodeId") + raise ValueError(f"Element {element_id} not found")