Unified WebPage object return for the WebBrowserEngine API

This commit is contained in:
shenchucheng 2023-08-07 16:35:08 +08:00
parent c62c870ab9
commit ede23b2fe9
7 changed files with 228 additions and 69 deletions

View file

@ -1,22 +1,20 @@
#!/usr/bin/env python
from __future__ import annotations
import asyncio
import importlib
from typing import Any, Callable, Coroutine, overload
import importlib
from typing import Any, Callable, Coroutine, Literal, overload
from metagpt.config import CONFIG
from metagpt.tools import WebBrowserEngineType
from bs4 import BeautifulSoup
from metagpt.utils.parse_html import WebPage
class WebBrowserEngine:
def __init__(
self,
engine: WebBrowserEngineType | None = None,
run_func: Callable[..., Coroutine[Any, Any, str | list[str]]] | None = None,
parse_func: Callable[[str], str] | None = None,
run_func: Callable[..., Coroutine[Any, Any, WebPage | list[WebPage]]] | None = None,
):
engine = engine or CONFIG.web_browser_engine
@ -30,30 +28,25 @@ class WebBrowserEngine:
run_func = run_func
else:
raise NotImplementedError
self.parse_func = parse_func or get_page_content
self.run_func = run_func
self.engine = engine
@overload
async def run(self, url: str) -> str:
async def run(self, url: str) -> WebPage:
...
@overload
async def run(self, url: str, *urls: str) -> list[str]:
async def run(self, url: str, *urls: str) -> list[WebPage]:
...
async def run(self, url: str, *urls: str) -> str | list[str]:
page = await self.run_func(url, *urls)
if isinstance(page, str):
return self.parse_func(page)
return [self.parse_func(i) for i in page]
def get_page_content(page: str):
soup = BeautifulSoup(page, "html.parser")
return "\n".join(i.text.strip() for i in soup.find_all(["h1", "h2", "h3", "h4", "h5", "p", "pre"]))
async def run(self, url: str, *urls: str) -> WebPage | list[WebPage]:
return await self.run_func(url, *urls)
if __name__ == "__main__":
text = asyncio.run(WebBrowserEngine().run("https://fuzhi.ai/"))
print(text)
import fire
async def main(url: str, *urls: str, engine_type: Literal["playwright", "selenium"] = "playwright", **kwargs):
return await WebBrowserEngine(WebBrowserEngineType(engine_type), **kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -2,12 +2,15 @@
from __future__ import annotations
import asyncio
from pathlib import Path
import sys
from pathlib import Path
from typing import Literal
from playwright.async_api import async_playwright
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.utils.parse_html import WebPage
class PlaywrightWrapper:
@ -16,7 +19,7 @@ class PlaywrightWrapper:
To use this module, you should have the `playwright` Python package installed and ensure that
the required browsers are also installed. You can install playwright by running the command
`pip install metagpt[playwright]` and download the necessary browser binaries by running the
command `playwright install` for the first time."
command `playwright install` for the first time.
"""
def __init__(
@ -40,27 +43,30 @@ class PlaywrightWrapper:
self._context_kwargs = context_kwargs
self._has_run_precheck = False
async def run(self, url: str, *urls: str) -> str | list[str]:
async def run(self, url: str, *urls: str) -> WebPage | list[WebPage]:
async with async_playwright() as ap:
browser_type = getattr(ap, self.browser_type)
await self._run_precheck(browser_type)
browser = await browser_type.launch(**self.launch_kwargs)
async def _scrape(url):
context = await browser.new_context(**self._context_kwargs)
page = await context.new_page()
async with page:
try:
await page.goto(url)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
content = await page.content()
return content
except Exception as e:
return f"Fail to load page content for {e}"
_scrape = self._scrape
if urls:
return await asyncio.gather(_scrape(url), *(_scrape(i) for i in urls))
return await _scrape(url)
return await asyncio.gather(_scrape(browser, url), *(_scrape(browser, i) for i in urls))
return await _scrape(browser, url)
async def _scrape(self, browser, url):
context = await browser.new_context(**self._context_kwargs)
page = await context.new_page()
async with page:
try:
await page.goto(url)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
html = await page.content()
inner_text = await page.evaluate("() => document.body.innerText")
except Exception as e:
inner_text = f"Fail to load page content for {e}"
html = ""
return WebPage(inner_text=inner_text, html=html, url=url)
async def _run_precheck(self, browser_type):
if self._has_run_precheck:
@ -72,6 +78,10 @@ class PlaywrightWrapper:
if CONFIG.global_proxy:
kwargs["env"] = {"ALL_PROXY": CONFIG.global_proxy}
await _install_browsers(self.browser_type, **kwargs)
if self._has_run_precheck:
return
if not executable_path.exists():
parts = executable_path.parts
available_paths = list(Path(*parts[:-3]).glob(f"{self.browser_type}-*"))
@ -85,25 +95,37 @@ class PlaywrightWrapper:
self._has_run_precheck = True
def _get_install_lock():
global _install_lock
if _install_lock is None:
_install_lock = asyncio.Lock()
return _install_lock
async def _install_browsers(*browsers, **kwargs) -> None:
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install",
*browsers,
"--with-deps",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs,
)
async with _get_install_lock():
browsers = [i for i in browsers if i not in _install_cache]
if not browsers:
return
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install",
*browsers,
# "--with-deps",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs,
)
await asyncio.gather(_log_stream(process.stdout, logger.info), _log_stream(process.stderr, logger.warning))
await asyncio.gather(_log_stream(process.stdout, logger.info), _log_stream(process.stderr, logger.warning))
if await process.wait() == 0:
logger.info(f"Install browser for playwright successfully.")
else:
logger.warning(f"Fail to install browser for playwright.")
if await process.wait() == 0:
logger.info("Install browser for playwright successfully.")
else:
logger.warning("Fail to install browser for playwright.")
_install_cache.update(browsers)
async def _log_stream(sr, log_func):
@ -114,8 +136,14 @@ async def _log_stream(sr, log_func):
log_func(f"[playwright install browser]: {line.decode().strip()}")
_install_lock: asyncio.Lock = None
_install_cache = set()
if __name__ == "__main__":
for i in ("chromium", "firefox", "webkit"):
text = asyncio.run(PlaywrightWrapper(i).run("https://httpbin.org/ip"))
print(text)
print(i)
import fire
async def main(url: str, *urls: str, browser_type: str = "chromium", **kwargs):
return await PlaywrightWrapper(browser_type, **kwargs).run(url, *urls)
fire.Fire(main)

View file

@ -2,16 +2,17 @@
from __future__ import annotations
import asyncio
from copy import deepcopy
import importlib
from concurrent import futures
from copy import deepcopy
from typing import Literal
from metagpt.config import CONFIG
import asyncio
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from concurrent import futures
from metagpt.config import CONFIG
from metagpt.utils.parse_html import WebPage
class SeleniumWrapper:
@ -48,7 +49,7 @@ class SeleniumWrapper:
self.loop = loop
self.executor = executor
async def run(self, url: str, *urls: str) -> str | list[str]:
async def run(self, url: str, *urls: str) -> WebPage | list[WebPage]:
await self._run_precheck()
_scrape = lambda url: self.loop.run_in_executor(self.executor, self._scrape_website, url)
@ -69,9 +70,15 @@ class SeleniumWrapper:
def _scrape_website(self, url):
with self._get_driver() as driver:
driver.get(url)
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
return driver.page_source
try:
driver.get(url)
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
inner_text = driver.execute_script("return document.body.innerText;")
html = driver.page_source
except Exception as e:
inner_text = f"Fail to load page content for {e}"
html = ""
return WebPage(inner_text=inner_text, html=html, url=url)
_webdriver_manager_types = {
@ -97,6 +104,7 @@ def _gen_get_driver_func(browser_type, *args, executable_path=None):
def _get_driver():
options = Options()
options.add_argument("--headless")
options.add_argument("--enable-javascript")
if browser_type == "chrome":
options.add_argument("--no-sandbox")
for i in args:
@ -107,5 +115,9 @@ def _gen_get_driver_func(browser_type, *args, executable_path=None):
if __name__ == "__main__":
text = asyncio.run(SeleniumWrapper("chrome").run("https://fuzhi.ai/"))
print(text)
import fire
async def main(url: str, *urls: str, browser_type: str = "chrome", **kwargs):
return await SeleniumWrapper(browser_type, **kwargs).run(url, *urls)
fire.Fire(main)