add web page scraping feature implemented by Playwright/Selenium

This commit is contained in:
shenchucheng 2023-07-24 00:18:36 +08:00
parent cfd5749456
commit ef279fdeb7
5 changed files with 326 additions and 24 deletions

View file

@ -4,6 +4,7 @@
提供配置单例
"""
import os
import openai
import yaml
@ -11,7 +12,7 @@ from metagpt.logs import logger
from metagpt.const import PROJECT_ROOT
from metagpt.utils.singleton import Singleton
from metagpt.tools import SearchEngineType
from metagpt.tools import SearchEngineType, WebBrowserEngineType
class NotConfiguredException(Exception):
@ -33,35 +34,42 @@ class Config(metaclass=Singleton):
secret_key = config.get_key("MY_SECRET_KEY")
print("Secret key:", secret_key)
"""
_instance = None
key_yaml_file = PROJECT_ROOT / 'config/key.yaml'
default_yaml_file = PROJECT_ROOT / 'config/config.yaml'
key_yaml_file = PROJECT_ROOT / "config/key.yaml"
default_yaml_file = PROJECT_ROOT / "config/config.yaml"
def __init__(self, yaml_file=default_yaml_file):
self._configs = {}
self._init_with_config_files_and_env(self._configs, yaml_file)
logger.info('Config loading done.')
self.openai_api_key = self._get('OPENAI_API_KEY')
if not self.openai_api_key or 'YOUR_API_KEY' == self.openai_api_key:
logger.info("Config loading done.")
self.global_proxy = self._get("GLOBAL_PROXY")
self.openai_api_key = self._get("OPENAI_API_KEY")
if not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key:
raise NotConfiguredException("Set OPENAI_API_KEY first")
self.openai_api_base = self._get('OPENAI_API_BASE')
if not self.openai_api_base or 'YOUR_API_BASE' == self.openai_api_base:
logger.info("Set OPENAI_API_BASE in case of network issues")
self.openai_api_type = self._get('OPENAI_API_TYPE')
self.openai_api_version = self._get('OPENAI_API_VERSION')
self.openai_api_rpm = self._get('RPM', 3)
self.openai_api_model = self._get('OPENAI_API_MODEL', "gpt-4")
self.max_tokens_rsp = self._get('MAX_TOKENS', 2048)
self.deployment_id = self._get('DEPLOYMENT_ID')
self.openai_api_base = self._get("OPENAI_API_BASE")
if not self.openai_api_base or "YOUR_API_BASE" == self.openai_api_base:
openai_proxy = self._get("OPENAI_PROXY") or self.global_proxy
if openai_proxy:
openai.proxy = openai_proxy
else:
logger.info("Set OPENAI_API_BASE in case of network issues")
self.openai_api_type = self._get("OPENAI_API_TYPE")
self.openai_api_version = self._get("OPENAI_API_VERSION")
self.openai_api_rpm = self._get("RPM", 3)
self.openai_api_model = self._get("OPENAI_API_MODEL", "gpt-4")
self.max_tokens_rsp = self._get("MAX_TOKENS", 2048)
self.deployment_id = self._get("DEPLOYMENT_ID")
self.claude_api_key = self._get('Anthropic_API_KEY')
self.serpapi_api_key = self._get('SERPAPI_API_KEY')
self.serper_api_key = self._get('SERPER_API_KEY')
self.google_api_key = self._get('GOOGLE_API_KEY')
self.google_cse_id = self._get('GOOGLE_CSE_ID')
self.search_engine = self._get('SEARCH_ENGINE', SearchEngineType.SERPAPI_GOOGLE)
self.max_budget = self._get('MAX_BUDGET', 10.0)
self.serpapi_api_key = self._get("SERPAPI_API_KEY")
self.serper_api_key = self._get("SERPER_API_KEY")
self.google_api_key = self._get("GOOGLE_API_KEY")
self.google_cse_id = self._get("GOOGLE_CSE_ID")
self.search_engine = self._get("SEARCH_ENGINE", SearchEngineType.SERPAPI_GOOGLE)
self.web_browser_engine = self._get("WEB_BROWSER_ENGINE", WebBrowserEngineType.PLAYWRIGHT)
self.playwright_browser_type = self._get("PLAYWRIGHT_BROWSER_TYPE", "chromium")
self.selenium_browser_type = self._get("selenium_browser_type", "chrome")
self.max_budget = self._get("MAX_BUDGET", 10.0)
self.total_cost = 0.0
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
@ -73,7 +81,7 @@ class Config(metaclass=Singleton):
continue
# 加载本地 YAML 文件
with open(_yaml_file, 'r', encoding="utf-8") as file:
with open(_yaml_file, "r", encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
if not yaml_data:
continue

View file

@ -15,3 +15,9 @@ class SearchEngineType(Enum):
DIRECT_GOOGLE = auto()
SERPER_GOOGLE = auto()
CUSTOM_ENGINE = auto()
class WebBrowserEngineType(Enum):
PLAYWRIGHT = auto()
SELENIUM = auto()
CUSTOM_ENGINE = auto()

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python
from __future__ import annotations
import asyncio
import importlib
from typing import Any, Callable, Coroutine, overload
from metagpt.config import Config
from metagpt.tools import WebBrowserEngineType
from bs4 import BeautifulSoup
class WebBrowserEngine:
def __init__(
self,
engine: WebBrowserEngineType | None = None,
run_func: Callable[..., Coroutine[Any, Any, str | list[str]]] | None = None,
):
self.config = Config()
engine = engine or self.config.web_browser_engine
if engine == WebBrowserEngineType.PLAYWRIGHT:
web_browser_engine = importlib.import_module("metagpt.tools.web_browser_engine_playwright")
run_func = web_browser_engine.PlaywrightWrapper().run
elif engine == WebBrowserEngineType.SELENIUM:
web_browser_engine = importlib.import_module("metagpt.tools.web_browser_engine_selenium")
run_func = web_browser_engine.SeleniumWrapper().run
elif engine == WebBrowserEngineType.CUSTOM_ENGINE:
run_func = run_func
else:
raise NotImplementedError
self.run_func = run_func
self.engine = engine
@overload
async def run(self, url: str) -> str:
...
@overload
async def run(self, url: str, *urls: str) -> list[str]:
...
async def run(self, url: str, *urls: str) -> str | list[str]:
page = await self.run_func(url, *urls)
if isinstance(page, str):
return get_page_content(page)
return [get_page_content(i) for i in page]
def get_page_content(page: str):
soup = BeautifulSoup(page, "html.parser")
return "\n".join(i.text.strip() for i in soup.find_all(["h1", "h2", "h3", "h4", "h5", "p", "pre"]))
if __name__ == "__main__":
text = asyncio.run(WebBrowserEngine().run("https://fuzhi.ai/"))
print(text)

View file

@ -0,0 +1,122 @@
#!/usr/bin/env python
from __future__ import annotations
import asyncio
from pathlib import Path
import sys
from typing import Literal
from playwright.async_api import async_playwright
from metagpt.config import Config
from metagpt.logs import logger
class PlaywrightWrapper:
"""Wrapper around Playwright.
To use this module, you should have the ``playwright`` Python package installed and ensure
that the required browsers are also installed. You can download the necessary browser binaries
by running the command `playwright install` for the first time.
"""
def __init__(
self,
browser_type: Literal["chromium", "firefox", "webkit"] | None = None,
launch_kwargs: dict | None = None,
**kwargs,
) -> None:
config = Config()
self.config = config
if browser_type is None:
browser_type = config.playwright_browser_type
self.browser_type = browser_type
launch_kwargs = launch_kwargs or {}
if config.global_proxy and "proxy" not in launch_kwargs:
args = launch_kwargs.get("args", [])
if not any(str.startswith(i, "--proxy-server=") for i in args):
launch_kwargs["proxy"] = {"server": config.global_proxy}
self.launch_kwargs = launch_kwargs
context_kwargs = {}
if "ignore_https_errors" in kwargs:
context_kwargs["ignore_https_errors"] = kwargs["ignore_https_errors"]
self._context_kwargs = context_kwargs
self._has_run_precheck = False
async def run(self, url: str, *urls: str) -> str | list[str]:
async with async_playwright() as ap:
browser_type = getattr(ap, self.browser_type)
await self._run_precheck(browser_type)
browser = await browser_type.launch(**self.launch_kwargs)
async def _scrape(url):
context = await browser.new_context(**self._context_kwargs)
page = await context.new_page()
async with page:
try:
await page.goto(url)
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
content = await page.content()
return content
except Exception as e:
return f"Fail to load page content for {e}"
if urls:
return await asyncio.gather(_scrape(url), *(_scrape(i) for i in urls))
return await _scrape(url)
async def _run_precheck(self, browser_type):
if self._has_run_precheck:
return
executable_path = Path(browser_type.executable_path)
if not executable_path.exists() and "executable_path" not in self.launch_kwargs:
kwargs = {}
if self.config.global_proxy:
kwargs["env"] = {"ALL_PROXY": self.config.global_proxy}
await _install_browsers(self.browser_type, **kwargs)
if not executable_path.exists():
parts = executable_path.parts
available_paths = list(Path(*parts[:-3]).glob(f"{self.browser_type}-*"))
if available_paths:
logger.warning(
"It seems that your OS is not officially supported by Playwright. "
"Try to set executable_path to the fallback build version."
)
executable_path = available_paths[0].joinpath(*parts[-2:])
self.launch_kwargs["executable_path"] = str(executable_path)
self._has_run_precheck = True
async def _install_browsers(*browsers, **kwargs) -> None:
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install",
*browsers,
"--with-deps",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs,
)
await asyncio.gather(_log_stream(process.stdout, logger.info), _log_stream(process.stderr, logger.warning))
if await process.wait() == 0:
logger.info(f"Install browser for playwright successfully.")
else:
logger.warning(f"Fail to install browser for playwright.")
async def _log_stream(sr, log_func):
while True:
line = await sr.readline()
if not line:
return
log_func(f"[playwright install browser]: {line.decode().strip()}")
if __name__ == "__main__":
for i in ("chromium", "firefox", "webkit"):
text = asyncio.run(PlaywrightWrapper(i).run("https://httpbin.org/ip"))
print(text)
print(i)

View file

@ -0,0 +1,108 @@
#!/usr/bin/env python
from __future__ import annotations
import asyncio
from copy import deepcopy
import importlib
from typing import Literal
from metagpt.config import Config
import asyncio
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from concurrent import futures
class SeleniumWrapper:
"""Wrapper around Selenium.
To use this module, you should have the ``selenium`` Python package installed and ensure
that the required browsers are also installed.
"""
def __init__(
self,
browser_type: Literal["chrome", "firefox", "edge", "ie"] | None = None,
launch_kwargs: dict | None = None,
*,
loop: asyncio.AbstractEventLoop | None = None,
executor: futures.Executor | None = None,
) -> None:
config = Config()
self.config = config
if browser_type is None:
browser_type = config.selenium_browser_type
self.browser_type = browser_type
launch_kwargs = launch_kwargs or {}
if config.global_proxy and "proxy-server" not in launch_kwargs:
launch_kwargs["proxy-server"] = config.global_proxy
self.executable_path = launch_kwargs.pop("executable_path", None)
self.launch_args = [f"--{k}={v}" for k, v in launch_kwargs.items()]
self._has_run_precheck = False
self._get_driver = None
self.loop = loop
self.executor = executor
async def run(self, url: str, *urls: str) -> str | list[str]:
await self._run_precheck()
_scrape = lambda url: self.loop.run_in_executor(self.executor, self._scrape_website, url)
if urls:
return await asyncio.gather(_scrape(url), *(_scrape(i) for i in urls))
return await _scrape(url)
async def _run_precheck(self):
if self._has_run_precheck:
return
self.loop = self.loop or asyncio.get_event_loop()
self._get_driver = await self.loop.run_in_executor(
self.executor,
lambda: _gen_get_driver_func(self.browser_type, *self.launch_args, executable_path=self.executable_path),
)
self._has_run_precheck = True
def _scrape_website(self, url):
with self._get_driver() as driver:
driver.get(url)
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
return driver.page_source
_webdriver_manager_types = {
"chrome": ("webdriver_manager.chrome", "ChromeDriverManager"),
"firefox": ("webdriver_manager.firefox", "GeckoDriverManager"),
"edge": ("webdriver_manager.microsoft", "EdgeChromiumDriverManager"),
"ie": ("webdriver_manager.microsoft", "IEDriverManager"),
}
def _gen_get_driver_func(browser_type, *args, executable_path=None):
WebDriver = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.webdriver"), "WebDriver")
Service = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.service"), "Service")
Options = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.options"), "Options")
if not executable_path:
module_name, type_name = _webdriver_manager_types[browser_type]
DriverManager = getattr(importlib.import_module(module_name), type_name)
driver_manager = DriverManager()
# driver_manager.driver_cache.find_driver(driver_manager.driver))
executable_path = driver_manager.install()
def _get_driver():
options = Options()
options.add_argument("--headless")
if browser_type == "chrome":
options.add_argument("--no-sandbox")
for i in args:
options.add_argument(i)
return WebDriver(options=deepcopy(options), service=Service(executable_path=executable_path))
return _get_driver
if __name__ == "__main__":
text = asyncio.run(SeleniumWrapper("chrome").run("https://fuzhi.ai/"))
print(text)