diff --git a/examples/di/use_browser.py b/examples/di/use_browser.py new file mode 100644 index 000000000..6dfc8de24 --- /dev/null +++ b/examples/di/use_browser.py @@ -0,0 +1,26 @@ +import asyncio + +from metagpt.roles.di.data_interpreter import DataInterpreter + +# an example to showcase navigation +MG_LLM_CONFIG_REQ = """ +This is a link to the doc site of MetaGPT project: https://docs.deepwisdom.ai/main/en/ +Check where you can go to on the site and try to find out the list of LLM APIs supported by MetaGPT. +Don't write all codes in one response, each time, just write code for one step. +""" + +# an example to showcase searching +PAPER_LIST_REQ = """" +At https://papercopilot.com/statistics/iclr-statistics/iclr-2024-statistics/, +find the first paper whose title includes `multiagent`, open it and summarize its abstract. +Don't write all codes in one response, each time, just write code for one step. +""" + + +async def main(): + di = DataInterpreter(tools=["Browser"], react_mode="react") + await di.run(MG_LLM_CONFIG_REQ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/metagpt/tools/libs/__init__.py b/metagpt/tools/libs/__init__.py index fb96db735..92f73ea54 100644 --- a/metagpt/tools/libs/__init__.py +++ b/metagpt/tools/libs/__init__.py @@ -13,6 +13,7 @@ from metagpt.tools.libs import ( email_login, terminal, file_manager, + browser, ) from metagpt.tools.libs.software_development import ( write_prd, @@ -40,4 +41,5 @@ _ = ( git_archive, terminal, file_manager, + browser, ) # Avoid pre-commit error diff --git a/metagpt/tools/libs/browser.py b/metagpt/tools/libs/browser.py new file mode 100644 index 000000000..0a73e9fc7 --- /dev/null +++ b/metagpt/tools/libs/browser.py @@ -0,0 +1,183 @@ +from playwright.async_api import async_playwright + +from metagpt.const import DEFAULT_WORKSPACE_ROOT +from metagpt.tools.tool_registry import register_tool +from metagpt.utils.common import encode_image + + +@register_tool() +class Browser: + """ + A tool for browsing the web. Don't initialize a new instance of this class if one already exists. + Note: Combine searching, scrolling, extraction, and link finding together to achieve most effective browsing. DON'T stick to one method. + """ + + def __init__(self): + """initiate the browser, create pages placeholder later to be managed as {page_url: page object}""" + self.browser = None + + from metagpt.config2 import config + from metagpt.llm import LLM + + self.llm = LLM(llm_config=config.get_openai_llm()) + self.llm.model = "gpt-4-vision-preview" + + # browser status management + self.pages = {} + self.current_page_url = None + self.current_page = None + + async def start(self): + """Starts Playwright and launches a browser""" + self.playwright = await async_playwright().start() + self.browser = await self.playwright.chromium.launch() + + async def open_new_page(self, url: str): + """open a new page in the browser, set it as the current page""" + page = await self.browser.new_page() + await page.goto(url) + self.pages[url] = page + self.current_page = page + self.current_page_url = url + print(f"Opened new page: {url}") + + async def switch_page(self, url: str): + """switch to an opened page in the browser, set it as the current page""" + if url in self.pages: + self.current_page = self.pages[url] + self.current_page_url = url + print(f"Switched to page: {url}") + else: + print(f"Page not found: {url}") + + async def search_content_all(self, search_term: str) -> list[dict]: + """search all occurences of search term in the current page and return the search results with their position. + Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it. + + Args: + search_term (str): the search term + + Returns: + list[dict]: a list of dictionaries containing the elements and their positions, e.g. + [ + { + "index": ..., + "content": { + "text_block": ..., + "links": [ + {"text": ..., "href": ...}, + ... + ] + }, + "position": {from_top: ..., from_left: ...}, + }, + ... + ] + """ + locator = self.current_page.locator(f"text={search_term}") + count = await locator.count() + search_results = [] + for i in range(count): + element = locator.nth(i) + if await element.is_visible(): + position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })") + + # Retrieve the surrounding block of text and links with their text + content = await element.evaluate( + """ + (element) => { + // const block = element.closest('p, div, section, article'); + const block = element.parentElement; + return { + text_block: block.innerText, + // Create an array of objects, each containing the text and href of a link + links: Array.from(block.querySelectorAll('a')).map(a => ({ + text: a.innerText, + href: a.href + })) + }; + } + """ + ) + + search_results.append( + {"index": len(search_results), "content": content, "position": position, "element_obj": element} + ) + + print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}") + + return search_results + + async def scroll_to_search_result(self, search_results: list[dict], index: int = 0): + """Scroll to the index-th search result, potentially for subsequent perception. + Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content. + + Args: + search_results (list[dict]): search_results from search_content_all + index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0. + """ + if not search_results: + return {} + if index >= len(search_results): + print(f"Index {index} is out of range. Scrolling to the last instance.") + index = len(search_results) - 1 + element = search_results[index]["element_obj"] + await element.scroll_into_view_if_needed() + print(f"Successfully scrolled to the {index}-th search result, consider extract more info around it.") + + async def find_links(self) -> list: + """Finds all links in the current page and returns a list of dictionaries with link text and the URL. + Useful for navigating to more pages and exploring more resources. + + Returns: + list: A list of dictionaries, each containing 'text' and 'href' keys. + """ + # Use a CSS selector to find all elements in the page. + links = await self.current_page.query_selector_all("a") + + # Prepare an empty list to hold link information. + link_info = [] + + # Iterate over each link element to extract its text and href attributes. + for link in links: + text = await link.text_content() + href = await link.get_attribute("href") + link_info.append({"text": text, "href": href}) + + print(f"Found {len(link_info)} links:\n\n{link_info}") + + return link_info + + async def extract_info_from_view(self, instruction: str) -> str: + """ + Extract useful info from the current page view. + + Args: + instruction (str): explain what info needs to be extracted + + Returns: + str: extracted info from current view + """ + img_path = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png" + await self.current_page.screenshot(path=img_path) + rsp = await self.llm.aask(msg=instruction, images=[encode_image(img_path)]) + return rsp + + async def scroll_current_page(self, offset: int = 500): + """scroll the current page by offset pixels, negative value means scrolling up, returning the content observed after scrolling""" + await self.current_page.evaluate(f"window.scrollBy(0, {offset})") + print(f"Scrolled current page by {offset} pixels. Perceive the scrolled view if needed") + + def check_all_pages(self) -> dict: + """return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state""" + pages_info = {url: page.title() for url, page in self.pages.items()} + return pages_info + + async def close(self): + """close the browser and all pages""" + await self.browser.close() + await self.playwright.stop() + + +async def get_scroll_position(page): + return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })") diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index 1340f32cb..ffc25ac05 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -783,13 +783,15 @@ def load_mc_skills_code(skill_names: list[str] = None, skills_dir: Path = None) return skills -def encode_image(image_path_or_pil: Union[Path, Image], encoding: str = "utf-8") -> str: +def encode_image(image_path_or_pil: Union[Path, Image, str], encoding: str = "utf-8") -> str: """encode image from file or PIL.Image into base64""" if isinstance(image_path_or_pil, Image.Image): buffer = BytesIO() image_path_or_pil.save(buffer, format="JPEG") bytes_data = buffer.getvalue() else: + if isinstance(image_path_or_pil, str): + image_path_or_pil = Path(image_path_or_pil) if not image_path_or_pil.exists(): raise FileNotFoundError(f"{image_path_or_pil} not exists") with open(str(image_path_or_pil), "rb") as image_file: diff --git a/tests/metagpt/tools/libs/test_browser.py b/tests/metagpt/tools/libs/test_browser.py new file mode 100644 index 000000000..0c3009fef --- /dev/null +++ b/tests/metagpt/tools/libs/test_browser.py @@ -0,0 +1,90 @@ +import pytest + +from metagpt.const import TEST_DATA_PATH +from metagpt.tools.libs.browser import Browser, get_scroll_position + +TEST_URL = "https://docs.deepwisdom.ai/main/en/guide/get_started/quickstart.html" + +TEST_SCREENSHOT_PATH = TEST_DATA_PATH / "screenshot.png" + + +@pytest.fixture(autouse=True) +def llm_mock(rsp_cache, mocker, request): + # An empty fixture to overwrite the global llm_mock fixture + # because in provider folder, we want to test the aask and aask functions for the specific models + pass + + +@pytest.fixture +def browser(): + browser_instance = Browser() + yield browser_instance + + +@pytest.mark.asyncio +async def test_open_and_switch_page(browser): + await browser.start() + + await browser.open_new_page("https://baidu.com") + await browser.open_new_page("https://tencent.com") + assert browser.current_page_url == "https://tencent.com" + await browser.switch_page("https://baidu.com") + assert browser.current_page_url == "https://baidu.com" + + await browser.close() + + +@pytest.mark.asyncio +async def test_search(browser): + await browser.start() + + # search all + await browser.open_new_page(TEST_URL) + search_term = "startup example" + search_results = await browser.search_content_all(search_term) + print(search_results) + # expected search result as of 20240410: + # [{'index': 0, 'content': {'text_block': 'Below is a breakdown of the software startup example. If you install MetaGPT with the git clone approach, simply run', 'links': [{'text': 'software startup example', 'href': 'https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py'}]}, 'position': {'from_top': 640, 'from_left': 225}, 'element_obj': selector='text=startup example >> nth=0'>}] + first_result = search_results[0]["content"] + assert "software startup example" in first_result["text_block"] + assert first_result["links"] + assert first_result["links"][0]["href"] == "https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py" + assert search_results[0]["position"] + + # scroll to search result + await browser.scroll_to_search_result(search_results, index=0) + + # perceive current view + rsp = await browser.extract_info_from_view("what is the command to run exactly?") + assert "metagpt" in rsp + + await browser.close() + + +@pytest.mark.asyncio +async def test_find_links(browser): + await browser.start() + + await browser.open_new_page(TEST_URL) + link_info = await browser.find_links() + assert link_info + + await browser.close() + + +@pytest.mark.asyncio +async def test_scroll(browser): + await browser.start() + + await browser.open_new_page(TEST_URL) + + await browser.scroll_current_page(offset=-500) + assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 0} # no change if you scrol up from top + + await browser.scroll_current_page(offset=500) # scroll down + assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 500} + + await browser.scroll_current_page(offset=-200) # scroll up + assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 300} + + await browser.close()