Merge branch 'di_mgx' into 'mgx_ops'

add browser

See merge request pub/MetaGPT!39
This commit is contained in:
洪思睿 2024-04-11 02:10:07 +00:00
commit cc8a86e806
5 changed files with 304 additions and 1 deletions

View file

@ -0,0 +1,26 @@
import asyncio
from metagpt.roles.di.data_interpreter import DataInterpreter
# an example to showcase navigation
MG_LLM_CONFIG_REQ = """
This is a link to the doc site of MetaGPT project: https://docs.deepwisdom.ai/main/en/
Check where you can go to on the site and try to find out the list of LLM APIs supported by MetaGPT.
Don't write all codes in one response, each time, just write code for one step.
"""
# an example to showcase searching
PAPER_LIST_REQ = """"
At https://papercopilot.com/statistics/iclr-statistics/iclr-2024-statistics/,
find the first paper whose title includes `multiagent`, open it and summarize its abstract.
Don't write all codes in one response, each time, just write code for one step.
"""
async def main():
di = DataInterpreter(tools=["Browser"], react_mode="react")
await di.run(MG_LLM_CONFIG_REQ)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -13,6 +13,7 @@ from metagpt.tools.libs import (
email_login,
terminal,
file_manager,
browser,
)
from metagpt.tools.libs.software_development import (
write_prd,
@ -40,4 +41,5 @@ _ = (
git_archive,
terminal,
file_manager,
browser,
) # Avoid pre-commit error

View file

@ -0,0 +1,183 @@
from playwright.async_api import async_playwright
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import encode_image
@register_tool()
class Browser:
"""
A tool for browsing the web. Don't initialize a new instance of this class if one already exists.
Note: Combine searching, scrolling, extraction, and link finding together to achieve most effective browsing. DON'T stick to one method.
"""
def __init__(self):
"""initiate the browser, create pages placeholder later to be managed as {page_url: page object}"""
self.browser = None
from metagpt.config2 import config
from metagpt.llm import LLM
self.llm = LLM(llm_config=config.get_openai_llm())
self.llm.model = "gpt-4-vision-preview"
# browser status management
self.pages = {}
self.current_page_url = None
self.current_page = None
async def start(self):
"""Starts Playwright and launches a browser"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch()
async def open_new_page(self, url: str):
"""open a new page in the browser, set it as the current page"""
page = await self.browser.new_page()
await page.goto(url)
self.pages[url] = page
self.current_page = page
self.current_page_url = url
print(f"Opened new page: {url}")
async def switch_page(self, url: str):
"""switch to an opened page in the browser, set it as the current page"""
if url in self.pages:
self.current_page = self.pages[url]
self.current_page_url = url
print(f"Switched to page: {url}")
else:
print(f"Page not found: {url}")
async def search_content_all(self, search_term: str) -> list[dict]:
"""search all occurences of search term in the current page and return the search results with their position.
Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it.
Args:
search_term (str): the search term
Returns:
list[dict]: a list of dictionaries containing the elements and their positions, e.g.
[
{
"index": ...,
"content": {
"text_block": ...,
"links": [
{"text": ..., "href": ...},
...
]
},
"position": {from_top: ..., from_left: ...},
},
...
]
"""
locator = self.current_page.locator(f"text={search_term}")
count = await locator.count()
search_results = []
for i in range(count):
element = locator.nth(i)
if await element.is_visible():
position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })")
# Retrieve the surrounding block of text and links with their text
content = await element.evaluate(
"""
(element) => {
// const block = element.closest('p, div, section, article');
const block = element.parentElement;
return {
text_block: block.innerText,
// Create an array of objects, each containing the text and href of a link
links: Array.from(block.querySelectorAll('a')).map(a => ({
text: a.innerText,
href: a.href
}))
};
}
"""
)
search_results.append(
{"index": len(search_results), "content": content, "position": position, "element_obj": element}
)
print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}")
return search_results
async def scroll_to_search_result(self, search_results: list[dict], index: int = 0):
"""Scroll to the index-th search result, potentially for subsequent perception.
Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content.
Args:
search_results (list[dict]): search_results from search_content_all
index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0.
"""
if not search_results:
return {}
if index >= len(search_results):
print(f"Index {index} is out of range. Scrolling to the last instance.")
index = len(search_results) - 1
element = search_results[index]["element_obj"]
await element.scroll_into_view_if_needed()
print(f"Successfully scrolled to the {index}-th search result, consider extract more info around it.")
async def find_links(self) -> list:
"""Finds all links in the current page and returns a list of dictionaries with link text and the URL.
Useful for navigating to more pages and exploring more resources.
Returns:
list: A list of dictionaries, each containing 'text' and 'href' keys.
"""
# Use a CSS selector to find all <a> elements in the page.
links = await self.current_page.query_selector_all("a")
# Prepare an empty list to hold link information.
link_info = []
# Iterate over each link element to extract its text and href attributes.
for link in links:
text = await link.text_content()
href = await link.get_attribute("href")
link_info.append({"text": text, "href": href})
print(f"Found {len(link_info)} links:\n\n{link_info}")
return link_info
async def extract_info_from_view(self, instruction: str) -> str:
"""
Extract useful info from the current page view.
Args:
instruction (str): explain what info needs to be extracted
Returns:
str: extracted info from current view
"""
img_path = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png"
await self.current_page.screenshot(path=img_path)
rsp = await self.llm.aask(msg=instruction, images=[encode_image(img_path)])
return rsp
async def scroll_current_page(self, offset: int = 500):
"""scroll the current page by offset pixels, negative value means scrolling up, returning the content observed after scrolling"""
await self.current_page.evaluate(f"window.scrollBy(0, {offset})")
print(f"Scrolled current page by {offset} pixels. Perceive the scrolled view if needed")
def check_all_pages(self) -> dict:
"""return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state"""
pages_info = {url: page.title() for url, page in self.pages.items()}
return pages_info
async def close(self):
"""close the browser and all pages"""
await self.browser.close()
await self.playwright.stop()
async def get_scroll_position(page):
return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })")

View file

@ -783,13 +783,15 @@ def load_mc_skills_code(skill_names: list[str] = None, skills_dir: Path = None)
return skills
def encode_image(image_path_or_pil: Union[Path, Image], encoding: str = "utf-8") -> str:
def encode_image(image_path_or_pil: Union[Path, Image, str], encoding: str = "utf-8") -> str:
"""encode image from file or PIL.Image into base64"""
if isinstance(image_path_or_pil, Image.Image):
buffer = BytesIO()
image_path_or_pil.save(buffer, format="JPEG")
bytes_data = buffer.getvalue()
else:
if isinstance(image_path_or_pil, str):
image_path_or_pil = Path(image_path_or_pil)
if not image_path_or_pil.exists():
raise FileNotFoundError(f"{image_path_or_pil} not exists")
with open(str(image_path_or_pil), "rb") as image_file:

View file

@ -0,0 +1,90 @@
import pytest
from metagpt.const import TEST_DATA_PATH
from metagpt.tools.libs.browser import Browser, get_scroll_position
TEST_URL = "https://docs.deepwisdom.ai/main/en/guide/get_started/quickstart.html"
TEST_SCREENSHOT_PATH = TEST_DATA_PATH / "screenshot.png"
@pytest.fixture(autouse=True)
def llm_mock(rsp_cache, mocker, request):
# An empty fixture to overwrite the global llm_mock fixture
# because in provider folder, we want to test the aask and aask functions for the specific models
pass
@pytest.fixture
def browser():
browser_instance = Browser()
yield browser_instance
@pytest.mark.asyncio
async def test_open_and_switch_page(browser):
await browser.start()
await browser.open_new_page("https://baidu.com")
await browser.open_new_page("https://tencent.com")
assert browser.current_page_url == "https://tencent.com"
await browser.switch_page("https://baidu.com")
assert browser.current_page_url == "https://baidu.com"
await browser.close()
@pytest.mark.asyncio
async def test_search(browser):
await browser.start()
# search all
await browser.open_new_page(TEST_URL)
search_term = "startup example"
search_results = await browser.search_content_all(search_term)
print(search_results)
# expected search result as of 20240410:
# [{'index': 0, 'content': {'text_block': 'Below is a breakdown of the software startup example. If you install MetaGPT with the git clone approach, simply run', 'links': [{'text': 'software startup example', 'href': 'https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py'}]}, 'position': {'from_top': 640, 'from_left': 225}, 'element_obj': <Locator frame=<Frame name= url='https://docs.deepwisdom.ai/main/en/guide/get_started/quickstart.html'> selector='text=startup example >> nth=0'>}]
first_result = search_results[0]["content"]
assert "software startup example" in first_result["text_block"]
assert first_result["links"]
assert first_result["links"][0]["href"] == "https://github.com/geekan/MetaGPT/blob/main/metagpt/software_company.py"
assert search_results[0]["position"]
# scroll to search result
await browser.scroll_to_search_result(search_results, index=0)
# perceive current view
rsp = await browser.extract_info_from_view("what is the command to run exactly?")
assert "metagpt" in rsp
await browser.close()
@pytest.mark.asyncio
async def test_find_links(browser):
await browser.start()
await browser.open_new_page(TEST_URL)
link_info = await browser.find_links()
assert link_info
await browser.close()
@pytest.mark.asyncio
async def test_scroll(browser):
await browser.start()
await browser.open_new_page(TEST_URL)
await browser.scroll_current_page(offset=-500)
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 0} # no change if you scrol up from top
await browser.scroll_current_page(offset=500) # scroll down
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 500}
await browser.scroll_current_page(offset=-200) # scroll up
assert await get_scroll_position(browser.current_page) == {"x": 0, "y": 300}
await browser.close()