mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-04-26 01:06:27 +02:00
Merge pull request #69 from shenchucheng/main
Add web page scraping feature implemented by Playwright/Selenium
This commit is contained in:
commit
007c8c0457
13 changed files with 479 additions and 30 deletions
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
#OPENAI_API_KEY: "YOUR_API_KEY"
|
||||
#OPENAI_API_BASE: "YOUR_API_BASE"
|
||||
#OPENAI_PROXY: "http://127.0.0.1:8118"
|
||||
OPENAI_API_MODEL: "gpt-4"
|
||||
MAX_TOKENS: 1500
|
||||
RPM: 10
|
||||
|
|
@ -31,6 +32,17 @@ RPM: 10
|
|||
## Visit https://serper.dev/ to get key.
|
||||
#SERPER_API_KEY: "YOUR_API_KEY"
|
||||
|
||||
#### for web access
|
||||
|
||||
## Supported values: playwright/selenium
|
||||
#WEB_BROWSER_ENGINE: playwright
|
||||
|
||||
## Supported values: chromium/firefox/webkit, visit https://playwright.dev/python/docs/api/class-browsertype
|
||||
##PLAYWRIGHT_BROWSER_TYPE: chromium
|
||||
|
||||
## Supported values: chrome/firefox/edge/ie, visit https://www.selenium.dev/documentation/webdriver/browsers/
|
||||
# SELENIUM_BROWSER_TYPE: chrome
|
||||
|
||||
#### for TTS
|
||||
|
||||
#AZURE_TTS_SUBSCRIPTION_KEY: "YOUR_API_KEY"
|
||||
|
|
|
|||
|
|
@ -58,8 +58,8 @@ ### Tasks
|
|||
5. Plugins: Compatibility with plugin system
|
||||
6. Tools
|
||||
1. ~~Support SERPER api~~
|
||||
2. Support Selenium apis
|
||||
3. Support Playwright apis
|
||||
2. ~~Support Selenium apis~~
|
||||
3. ~~Support Playwright apis~~
|
||||
7. Roles
|
||||
1. Perfect the action pool/skill pool for each role
|
||||
2. Red Book blogger
|
||||
|
|
|
|||
|
|
@ -4,13 +4,14 @@
|
|||
提供配置,单例
|
||||
"""
|
||||
import os
|
||||
import openai
|
||||
|
||||
import yaml
|
||||
|
||||
from metagpt.const import PROJECT_ROOT
|
||||
from metagpt.logs import logger
|
||||
from metagpt.tools import SearchEngineType
|
||||
from metagpt.utils.singleton import Singleton
|
||||
from metagpt.tools import SearchEngineType, WebBrowserEngineType
|
||||
|
||||
|
||||
class NotConfiguredException(Exception):
|
||||
|
|
@ -32,40 +33,49 @@ class Config(metaclass=Singleton):
|
|||
secret_key = config.get_key("MY_SECRET_KEY")
|
||||
print("Secret key:", secret_key)
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
key_yaml_file = PROJECT_ROOT / 'config/key.yaml'
|
||||
default_yaml_file = PROJECT_ROOT / 'config/config.yaml'
|
||||
key_yaml_file = PROJECT_ROOT / "config/key.yaml"
|
||||
default_yaml_file = PROJECT_ROOT / "config/config.yaml"
|
||||
|
||||
def __init__(self, yaml_file=default_yaml_file):
|
||||
self._configs = {}
|
||||
self._init_with_config_files_and_env(self._configs, yaml_file)
|
||||
logger.info('Config loading done.')
|
||||
self.openai_api_key = self._get('OPENAI_API_KEY')
|
||||
if not self.openai_api_key or 'YOUR_API_KEY' == self.openai_api_key:
|
||||
logger.info("Config loading done.")
|
||||
self.global_proxy = self._get("GLOBAL_PROXY")
|
||||
self.openai_api_key = self._get("OPENAI_API_KEY")
|
||||
if not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key:
|
||||
raise NotConfiguredException("Set OPENAI_API_KEY first")
|
||||
self.openai_api_base = self._get('OPENAI_API_BASE')
|
||||
if not self.openai_api_base or 'YOUR_API_BASE' == self.openai_api_base:
|
||||
logger.info("Set OPENAI_API_BASE in case of network issues")
|
||||
self.openai_api_type = self._get('OPENAI_API_TYPE')
|
||||
self.openai_api_version = self._get('OPENAI_API_VERSION')
|
||||
self.openai_api_rpm = self._get('RPM', 3)
|
||||
self.openai_api_model = self._get('OPENAI_API_MODEL', "gpt-4")
|
||||
self.max_tokens_rsp = self._get('MAX_TOKENS', 2048)
|
||||
self.deployment_id = self._get('DEPLOYMENT_ID')
|
||||
|
||||
self.openai_api_base = self._get("OPENAI_API_BASE")
|
||||
if not self.openai_api_base or "YOUR_API_BASE" == self.openai_api_base:
|
||||
openai_proxy = self._get("OPENAI_PROXY") or self.global_proxy
|
||||
if openai_proxy:
|
||||
openai.proxy = openai_proxy
|
||||
else:
|
||||
logger.info("Set OPENAI_API_BASE in case of network issues")
|
||||
self.openai_api_type = self._get("OPENAI_API_TYPE")
|
||||
self.openai_api_version = self._get("OPENAI_API_VERSION")
|
||||
self.openai_api_rpm = self._get("RPM", 3)
|
||||
self.openai_api_model = self._get("OPENAI_API_MODEL", "gpt-4")
|
||||
self.max_tokens_rsp = self._get("MAX_TOKENS", 2048)
|
||||
self.deployment_id = self._get("DEPLOYMENT_ID")
|
||||
|
||||
self.claude_api_key = self._get('Anthropic_API_KEY')
|
||||
|
||||
self.serpapi_api_key = self._get('SERPAPI_API_KEY')
|
||||
self.serper_api_key = self._get('SERPER_API_KEY')
|
||||
self.google_api_key = self._get('GOOGLE_API_KEY')
|
||||
self.google_cse_id = self._get('GOOGLE_CSE_ID')
|
||||
self.search_engine = self._get('SEARCH_ENGINE', SearchEngineType.SERPAPI_GOOGLE)
|
||||
|
||||
self.serpapi_api_key = self._get("SERPAPI_API_KEY")
|
||||
self.serper_api_key = self._get("SERPER_API_KEY")
|
||||
self.google_api_key = self._get("GOOGLE_API_KEY")
|
||||
self.google_cse_id = self._get("GOOGLE_CSE_ID")
|
||||
self.search_engine = self._get("SEARCH_ENGINE", SearchEngineType.SERPAPI_GOOGLE)
|
||||
|
||||
self.web_browser_engine = WebBrowserEngineType(self._get("WEB_BROWSER_ENGINE", "playwright"))
|
||||
self.playwright_browser_type = self._get("PLAYWRIGHT_BROWSER_TYPE", "chromium")
|
||||
self.selenium_browser_type = self._get("SELENIUM_BROWSER_TYPE", "chrome")
|
||||
|
||||
self.long_term_memory = self._get('LONG_TERM_MEMORY', False)
|
||||
if self.long_term_memory:
|
||||
logger.warning("LONG_TERM_MEMORY is True")
|
||||
|
||||
self.max_budget = self._get('MAX_BUDGET', 10.0)
|
||||
self.max_budget = self._get("MAX_BUDGET", 10.0)
|
||||
self.total_cost = 0.0
|
||||
|
||||
def _init_with_config_files_and_env(self, configs: dict, yaml_file):
|
||||
|
|
@ -77,7 +87,7 @@ class Config(metaclass=Singleton):
|
|||
continue
|
||||
|
||||
# 加载本地 YAML 文件
|
||||
with open(_yaml_file, 'r', encoding="utf-8") as file:
|
||||
with open(_yaml_file, "r", encoding="utf-8") as file:
|
||||
yaml_data = yaml.safe_load(file)
|
||||
if not yaml_data:
|
||||
continue
|
||||
|
|
|
|||
|
|
@ -15,3 +15,9 @@ class SearchEngineType(Enum):
|
|||
DIRECT_GOOGLE = auto()
|
||||
SERPER_GOOGLE = auto()
|
||||
CUSTOM_ENGINE = auto()
|
||||
|
||||
|
||||
class WebBrowserEngineType(Enum):
|
||||
PLAYWRIGHT = "playwright"
|
||||
SELENIUM = "selenium"
|
||||
CUSTOM = "custom"
|
||||
|
|
|
|||
59
metagpt/tools/web_browser_engine.py
Normal file
59
metagpt/tools/web_browser_engine.py
Normal file
|
|
@ -0,0 +1,59 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import importlib
|
||||
|
||||
from typing import Any, Callable, Coroutine, overload
|
||||
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.tools import WebBrowserEngineType
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
class WebBrowserEngine:
|
||||
def __init__(
|
||||
self,
|
||||
engine: WebBrowserEngineType | None = None,
|
||||
run_func: Callable[..., Coroutine[Any, Any, str | list[str]]] | None = None,
|
||||
parse_func: Callable[[str], str] | None = None,
|
||||
):
|
||||
engine = engine or CONFIG.web_browser_engine
|
||||
|
||||
if engine == WebBrowserEngineType.PLAYWRIGHT:
|
||||
module = "metagpt.tools.web_browser_engine_playwright"
|
||||
run_func = importlib.import_module(module).PlaywrightWrapper().run
|
||||
elif engine == WebBrowserEngineType.SELENIUM:
|
||||
module = "metagpt.tools.web_browser_engine_selenium"
|
||||
run_func = importlib.import_module(module).SeleniumWrapper().run
|
||||
elif engine == WebBrowserEngineType.CUSTOM:
|
||||
run_func = run_func
|
||||
else:
|
||||
raise NotImplementedError
|
||||
self.parse_func = parse_func or get_page_content
|
||||
self.run_func = run_func
|
||||
self.engine = engine
|
||||
|
||||
@overload
|
||||
async def run(self, url: str) -> str:
|
||||
...
|
||||
|
||||
@overload
|
||||
async def run(self, url: str, *urls: str) -> list[str]:
|
||||
...
|
||||
|
||||
async def run(self, url: str, *urls: str) -> str | list[str]:
|
||||
page = await self.run_func(url, *urls)
|
||||
if isinstance(page, str):
|
||||
return self.parse_func(page)
|
||||
return [self.parse_func(i) for i in page]
|
||||
|
||||
|
||||
def get_page_content(page: str):
|
||||
soup = BeautifulSoup(page, "html.parser")
|
||||
return "\n".join(i.text.strip() for i in soup.find_all(["h1", "h2", "h3", "h4", "h5", "p", "pre"]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
text = asyncio.run(WebBrowserEngine().run("https://fuzhi.ai/"))
|
||||
print(text)
|
||||
121
metagpt/tools/web_browser_engine_playwright.py
Normal file
121
metagpt/tools/web_browser_engine_playwright.py
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
#!/usr/bin/env python
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
import sys
|
||||
from typing import Literal
|
||||
from playwright.async_api import async_playwright
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
class PlaywrightWrapper:
|
||||
"""Wrapper around Playwright.
|
||||
|
||||
To use this module, you should have the `playwright` Python package installed and ensure that
|
||||
the required browsers are also installed. You can install playwright by running the command
|
||||
`pip install metagpt[playwright]` and download the necessary browser binaries by running the
|
||||
command `playwright install` for the first time."
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
browser_type: Literal["chromium", "firefox", "webkit"] | None = None,
|
||||
launch_kwargs: dict | None = None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
if browser_type is None:
|
||||
browser_type = CONFIG.playwright_browser_type
|
||||
self.browser_type = browser_type
|
||||
launch_kwargs = launch_kwargs or {}
|
||||
if CONFIG.global_proxy and "proxy" not in launch_kwargs:
|
||||
args = launch_kwargs.get("args", [])
|
||||
if not any(str.startswith(i, "--proxy-server=") for i in args):
|
||||
launch_kwargs["proxy"] = {"server": CONFIG.global_proxy}
|
||||
self.launch_kwargs = launch_kwargs
|
||||
context_kwargs = {}
|
||||
if "ignore_https_errors" in kwargs:
|
||||
context_kwargs["ignore_https_errors"] = kwargs["ignore_https_errors"]
|
||||
self._context_kwargs = context_kwargs
|
||||
self._has_run_precheck = False
|
||||
|
||||
async def run(self, url: str, *urls: str) -> str | list[str]:
|
||||
async with async_playwright() as ap:
|
||||
browser_type = getattr(ap, self.browser_type)
|
||||
await self._run_precheck(browser_type)
|
||||
browser = await browser_type.launch(**self.launch_kwargs)
|
||||
|
||||
async def _scrape(url):
|
||||
context = await browser.new_context(**self._context_kwargs)
|
||||
page = await context.new_page()
|
||||
async with page:
|
||||
try:
|
||||
await page.goto(url)
|
||||
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||||
content = await page.content()
|
||||
return content
|
||||
except Exception as e:
|
||||
return f"Fail to load page content for {e}"
|
||||
|
||||
if urls:
|
||||
return await asyncio.gather(_scrape(url), *(_scrape(i) for i in urls))
|
||||
return await _scrape(url)
|
||||
|
||||
async def _run_precheck(self, browser_type):
|
||||
if self._has_run_precheck:
|
||||
return
|
||||
|
||||
executable_path = Path(browser_type.executable_path)
|
||||
if not executable_path.exists() and "executable_path" not in self.launch_kwargs:
|
||||
kwargs = {}
|
||||
if CONFIG.global_proxy:
|
||||
kwargs["env"] = {"ALL_PROXY": CONFIG.global_proxy}
|
||||
await _install_browsers(self.browser_type, **kwargs)
|
||||
if not executable_path.exists():
|
||||
parts = executable_path.parts
|
||||
available_paths = list(Path(*parts[:-3]).glob(f"{self.browser_type}-*"))
|
||||
if available_paths:
|
||||
logger.warning(
|
||||
"It seems that your OS is not officially supported by Playwright. "
|
||||
"Try to set executable_path to the fallback build version."
|
||||
)
|
||||
executable_path = available_paths[0].joinpath(*parts[-2:])
|
||||
self.launch_kwargs["executable_path"] = str(executable_path)
|
||||
self._has_run_precheck = True
|
||||
|
||||
|
||||
async def _install_browsers(*browsers, **kwargs) -> None:
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
sys.executable,
|
||||
"-m",
|
||||
"playwright",
|
||||
"install",
|
||||
*browsers,
|
||||
"--with-deps",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
await asyncio.gather(_log_stream(process.stdout, logger.info), _log_stream(process.stderr, logger.warning))
|
||||
|
||||
if await process.wait() == 0:
|
||||
logger.info(f"Install browser for playwright successfully.")
|
||||
else:
|
||||
logger.warning(f"Fail to install browser for playwright.")
|
||||
|
||||
|
||||
async def _log_stream(sr, log_func):
|
||||
while True:
|
||||
line = await sr.readline()
|
||||
if not line:
|
||||
return
|
||||
log_func(f"[playwright install browser]: {line.decode().strip()}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for i in ("chromium", "firefox", "webkit"):
|
||||
text = asyncio.run(PlaywrightWrapper(i).run("https://httpbin.org/ip"))
|
||||
print(text)
|
||||
print(i)
|
||||
111
metagpt/tools/web_browser_engine_selenium.py
Normal file
111
metagpt/tools/web_browser_engine_selenium.py
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
#!/usr/bin/env python
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from copy import deepcopy
|
||||
import importlib
|
||||
from typing import Literal
|
||||
from metagpt.config import CONFIG
|
||||
import asyncio
|
||||
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.support.wait import WebDriverWait
|
||||
from concurrent import futures
|
||||
|
||||
|
||||
class SeleniumWrapper:
|
||||
"""Wrapper around Selenium.
|
||||
|
||||
To use this module, you should check the following:
|
||||
|
||||
1. Run the following command: pip install metagpt[selenium].
|
||||
2. Make sure you have a compatible web browser installed and the appropriate WebDriver set up
|
||||
for that browser before running. For example, if you have Mozilla Firefox installed on your
|
||||
computer, you can set the configuration SELENIUM_BROWSER_TYPE to firefox. After that, you
|
||||
can scrape web pages using the Selenium WebBrowserEngine.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
browser_type: Literal["chrome", "firefox", "edge", "ie"] | None = None,
|
||||
launch_kwargs: dict | None = None,
|
||||
*,
|
||||
loop: asyncio.AbstractEventLoop | None = None,
|
||||
executor: futures.Executor | None = None,
|
||||
) -> None:
|
||||
if browser_type is None:
|
||||
browser_type = CONFIG.selenium_browser_type
|
||||
self.browser_type = browser_type
|
||||
launch_kwargs = launch_kwargs or {}
|
||||
if CONFIG.global_proxy and "proxy-server" not in launch_kwargs:
|
||||
launch_kwargs["proxy-server"] = CONFIG.global_proxy
|
||||
|
||||
self.executable_path = launch_kwargs.pop("executable_path", None)
|
||||
self.launch_args = [f"--{k}={v}" for k, v in launch_kwargs.items()]
|
||||
self._has_run_precheck = False
|
||||
self._get_driver = None
|
||||
self.loop = loop
|
||||
self.executor = executor
|
||||
|
||||
async def run(self, url: str, *urls: str) -> str | list[str]:
|
||||
await self._run_precheck()
|
||||
|
||||
_scrape = lambda url: self.loop.run_in_executor(self.executor, self._scrape_website, url)
|
||||
|
||||
if urls:
|
||||
return await asyncio.gather(_scrape(url), *(_scrape(i) for i in urls))
|
||||
return await _scrape(url)
|
||||
|
||||
async def _run_precheck(self):
|
||||
if self._has_run_precheck:
|
||||
return
|
||||
self.loop = self.loop or asyncio.get_event_loop()
|
||||
self._get_driver = await self.loop.run_in_executor(
|
||||
self.executor,
|
||||
lambda: _gen_get_driver_func(self.browser_type, *self.launch_args, executable_path=self.executable_path),
|
||||
)
|
||||
self._has_run_precheck = True
|
||||
|
||||
def _scrape_website(self, url):
|
||||
with self._get_driver() as driver:
|
||||
driver.get(url)
|
||||
WebDriverWait(driver, 30).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
|
||||
return driver.page_source
|
||||
|
||||
|
||||
_webdriver_manager_types = {
|
||||
"chrome": ("webdriver_manager.chrome", "ChromeDriverManager"),
|
||||
"firefox": ("webdriver_manager.firefox", "GeckoDriverManager"),
|
||||
"edge": ("webdriver_manager.microsoft", "EdgeChromiumDriverManager"),
|
||||
"ie": ("webdriver_manager.microsoft", "IEDriverManager"),
|
||||
}
|
||||
|
||||
|
||||
def _gen_get_driver_func(browser_type, *args, executable_path=None):
|
||||
WebDriver = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.webdriver"), "WebDriver")
|
||||
Service = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.service"), "Service")
|
||||
Options = getattr(importlib.import_module(f"selenium.webdriver.{browser_type}.options"), "Options")
|
||||
|
||||
if not executable_path:
|
||||
module_name, type_name = _webdriver_manager_types[browser_type]
|
||||
DriverManager = getattr(importlib.import_module(module_name), type_name)
|
||||
driver_manager = DriverManager()
|
||||
# driver_manager.driver_cache.find_driver(driver_manager.driver))
|
||||
executable_path = driver_manager.install()
|
||||
|
||||
def _get_driver():
|
||||
options = Options()
|
||||
options.add_argument("--headless")
|
||||
if browser_type == "chrome":
|
||||
options.add_argument("--no-sandbox")
|
||||
for i in args:
|
||||
options.add_argument(i)
|
||||
return WebDriver(options=deepcopy(options), service=Service(executable_path=executable_path))
|
||||
|
||||
return _get_driver
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
text = asyncio.run(SeleniumWrapper("chrome").run("https://fuzhi.ai/"))
|
||||
print(text)
|
||||
|
|
@ -29,6 +29,9 @@ tenacity==8.2.2
|
|||
tiktoken==0.3.3
|
||||
tqdm==4.64.0
|
||||
#unstructured[local-inference]
|
||||
# playwright
|
||||
# selenium>4
|
||||
# webdriver_manager<3.9
|
||||
anthropic==0.3.6
|
||||
typing-inspect==0.8.0
|
||||
typing_extensions==4.5.0
|
||||
|
|
|
|||
10
setup.py
10
setup.py
|
|
@ -10,12 +10,12 @@ from setuptools import Command, find_packages, setup
|
|||
class InstallMermaidCLI(Command):
|
||||
"""A custom command to run `npm install -g @mermaid-js/mermaid-cli` via a subprocess."""
|
||||
|
||||
description = 'install mermaid-cli'
|
||||
description = "install mermaid-cli"
|
||||
user_options = []
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
subprocess.check_call(['npm', 'install', '-g', '@mermaid-js/mermaid-cli'])
|
||||
subprocess.check_call(["npm", "install", "-g", "@mermaid-js/mermaid-cli"])
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Error occurred: {e.output}")
|
||||
|
||||
|
|
@ -42,7 +42,11 @@ setup(
|
|||
packages=find_packages(exclude=["contrib", "docs", "examples"]),
|
||||
python_requires=">=3.9",
|
||||
install_requires=requirements,
|
||||
extras_require={
|
||||
"playwright": ["playwright>=1.26", "beautifulsoup4"],
|
||||
"selenium": ["selenium>4", "webdriver_manager<3.9", "beautifulsoup4"],
|
||||
},
|
||||
cmdclass={
|
||||
'install_mermaid': InstallMermaidCLI,
|
||||
"install_mermaid": InstallMermaidCLI,
|
||||
},
|
||||
)
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@ import pytest
|
|||
|
||||
from metagpt.logs import logger
|
||||
from metagpt.provider.openai_api import OpenAIGPTAPI as GPTAPI
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
|
||||
class Context:
|
||||
|
|
@ -38,3 +40,31 @@ def llm_api():
|
|||
def mock_llm():
|
||||
# Create a mock LLM for testing
|
||||
return Mock()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def proxy():
|
||||
pattern = re.compile(
|
||||
rb"(?P<method>[a-zA-Z]+) (?P<uri>(\w+://)?(?P<host>[^\s\'\"<>\[\]{}|/:]+)(:(?P<port>\d+))?[^\s\'\"<>\[\]{}|]*) "
|
||||
)
|
||||
|
||||
async def pipe(reader, writer):
|
||||
while not reader.at_eof():
|
||||
writer.write(await reader.read(2048))
|
||||
writer.close()
|
||||
|
||||
async def handle_client(reader, writer):
|
||||
data = await reader.readuntil(b"\r\n\r\n")
|
||||
print(f"Proxy: {data}") # checking with capfd fixture
|
||||
infos = pattern.match(data)
|
||||
host, port = infos.group("host"), infos.group("port")
|
||||
port = int(port) if port else 80
|
||||
remote_reader, remote_writer = await asyncio.open_connection(host, port)
|
||||
if data.startswith(b"CONNECT"):
|
||||
writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n")
|
||||
else:
|
||||
remote_writer.write(data)
|
||||
await asyncio.gather(pipe(reader, remote_writer), pipe(remote_reader, writer))
|
||||
|
||||
server = asyncio.get_event_loop().run_until_complete(asyncio.start_server(handle_client, "127.0.0.1", 0))
|
||||
return "http://{}:{}".format(*server.sockets[0].getsockname())
|
||||
|
|
|
|||
25
tests/metagpt/tools/test_web_browser_engine.py
Normal file
25
tests/metagpt/tools/test_web_browser_engine.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
import pytest
|
||||
from metagpt.config import Config
|
||||
from metagpt.tools import web_browser_engine, WebBrowserEngineType
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"browser_type, url, urls",
|
||||
[
|
||||
(WebBrowserEngineType.PLAYWRIGHT, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
(WebBrowserEngineType.SELENIUM, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
],
|
||||
ids=["playwright", "selenium"],
|
||||
)
|
||||
async def test_scrape_web_page(browser_type, url, urls):
|
||||
browser = web_browser_engine.WebBrowserEngine(browser_type)
|
||||
result = await browser.run(url)
|
||||
assert isinstance(result, str)
|
||||
assert "深度赋智" in result
|
||||
|
||||
if urls:
|
||||
results = await browser.run(url, *urls)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == len(urls) + 1
|
||||
assert all(("深度赋智" in i) for i in results)
|
||||
34
tests/metagpt/tools/test_web_browser_engine_playwright.py
Normal file
34
tests/metagpt/tools/test_web_browser_engine_playwright.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
import pytest
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.tools import web_browser_engine_playwright
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"browser_type, use_proxy, kwagrs, url, urls",
|
||||
[
|
||||
("chromium", {"proxy": True}, {}, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
("firefox", {}, {"ignore_https_errors": True}, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
("webkit", {}, {"ignore_https_errors": True}, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
],
|
||||
ids=["chromium-normal", "firefox-normal", "webkit-normal"],
|
||||
)
|
||||
async def test_scrape_web_page(browser_type, use_proxy, kwagrs, url, urls, proxy, capfd):
|
||||
try:
|
||||
global_proxy = CONFIG.global_proxy
|
||||
if use_proxy:
|
||||
CONFIG.global_proxy = proxy
|
||||
browser = web_browser_engine_playwright.PlaywrightWrapper(browser_type, **kwagrs)
|
||||
result = await browser.run(url)
|
||||
assert isinstance(result, str)
|
||||
assert "Deepwisdom" in result
|
||||
|
||||
if urls:
|
||||
results = await browser.run(url, *urls)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == len(urls) + 1
|
||||
assert all(("Deepwisdom" in i) for i in results)
|
||||
if use_proxy:
|
||||
assert "Proxy:" in capfd.readouterr().out
|
||||
finally:
|
||||
CONFIG.global_proxy = global_proxy
|
||||
34
tests/metagpt/tools/test_web_browser_engine_selenium.py
Normal file
34
tests/metagpt/tools/test_web_browser_engine_selenium.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
import pytest
|
||||
from metagpt.config import CONFIG
|
||||
from metagpt.tools import web_browser_engine_selenium
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"browser_type, use_proxy, url, urls",
|
||||
[
|
||||
("chrome", True, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
("firefox", False, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
("edge", False, "https://fuzhi.ai", ("https://fuzhi.ai",)),
|
||||
],
|
||||
ids=["chrome-normal", "firefox-normal", "edge-normal"],
|
||||
)
|
||||
async def test_scrape_web_page(browser_type, use_proxy, url, urls, proxy, capfd):
|
||||
try:
|
||||
global_proxy = CONFIG.global_proxy
|
||||
if use_proxy:
|
||||
CONFIG.global_proxy = proxy
|
||||
browser = web_browser_engine_selenium.SeleniumWrapper(browser_type)
|
||||
result = await browser.run(url)
|
||||
assert isinstance(result, str)
|
||||
assert "Deepwisdom" in result
|
||||
|
||||
if urls:
|
||||
results = await browser.run(url, *urls)
|
||||
assert isinstance(results, list)
|
||||
assert len(results) == len(urls) + 1
|
||||
assert all(("Deepwisdom" in i) for i in results)
|
||||
if use_proxy:
|
||||
assert "Proxy:" in capfd.readouterr().out
|
||||
finally:
|
||||
CONFIG.global_proxy = global_proxy
|
||||
Loading…
Add table
Add a link
Reference in a new issue