Merge branch 'mgx_ops' into json_repair

# Conflicts:
#	metagpt/prompts/di/role_zero.py
#	metagpt/roles/di/data_analyst.py
This commit is contained in:
zhanglei 2024-06-28 15:21:39 +08:00
commit 8d973c2cf7
57 changed files with 4180 additions and 278 deletions

View file

@ -1,261 +1,206 @@
from __future__ import annotations
import contextlib
from uuid import uuid4
import time
from typing import Literal, Optional
from playwright.async_api import async_playwright
from playwright.async_api import Browser as Browser_
from playwright.async_api import (
BrowserContext,
Frame,
Page,
Playwright,
Request,
async_playwright,
)
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.file import MemoryFileSystem
from metagpt.utils.parse_html import simplify_html
from metagpt.utils.a11y_tree import (
click_element,
get_accessibility_tree,
get_backend_node_id,
hover_element,
key_press,
parse_accessibility_tree,
scroll_page,
type_text,
)
from metagpt.utils.report import BrowserReporter
@register_tool(tags=["web", "browse", "scrape"])
@register_tool(
tags=["web", "browse"],
include_functions=[
"click",
"close_tab",
"go_back",
"go_forward",
"goto",
"hover",
"press",
"scroll",
"tab_focus",
"type",
],
)
class Browser:
"""
A tool for browsing the web and scraping. Don't initialize a new instance of this class if one already exists.
Note: Combine searching and scrolling together to achieve most effective browsing. DON'T stick to one method.
"""A tool for browsing the web. Don't initialize a new instance of this class if one already exists.
Note: If you plan to use the browser to assist you in completing tasks, then using the browser should be a standalone
task, executing actions each time based on the content seen on the webpage before proceeding to the next step.
## Example
Issue: The details of the latest issue in the geekan/MetaGPT repository.
Plan: Use a browser to view the details of the latest issue in the geekan/MetaGPT repository.
Solution:
Let's first open the issue page of the MetaGPT repository with the `Browser.goto` command
>>> await browser.goto("https://github.com/geekan/MetaGPT/issues")
From the output webpage, we've identified that the latest issue can be accessed by clicking on the element with ID "1141".
>>> await browser.click(1141)
Finally, we have found the webpage for the latest issue, we can close the tab and finish current task.
>>> await browser.close_tab()
"""
def __init__(self):
"""initiate the browser, create pages placeholder later to be managed as {page_url: page object}"""
self.browser = None
# browser status management
self.pages = {}
self.current_page_url = None
self.current_page = None
self.playwright: Optional[Playwright] = None
self.browser_instance: Optional[Browser_] = None
self.browser_ctx: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.accessibility_tree: list = []
self.headless: bool = True
self.proxy = None
self.is_empty_page = True
self.reporter = BrowserReporter()
async def start(self):
async def start(self) -> None:
"""Starts Playwright and launches a browser"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch()
if self.playwright is None:
self.playwright = playwright = await async_playwright().start()
browser = self.browser_instance = await playwright.chromium.launch(headless=self.headless, proxy=self.proxy)
browser_ctx = self.browser_ctx = await browser.new_context()
self.page = await browser_ctx.new_page()
async def _set_current_page(self, page, url):
self.current_page = page
self.current_page_url = url
print("Now on page ", url)
await self._view()
async def stop(self):
if self.playwright:
playwright = self.playwright
self.playwright = None
self.browser_instance = None
self.browser_ctx = None
await playwright.stop()
async def open_new_page(self, url: str, timeout: float = 30000):
"""open a new page in the browser and view the page"""
async def click(self, element_id: int):
"""clicks on an element with a specific id on the webpage."""
await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
return await self._wait_page()
async def type(self, element_id: int, content: str, press_enter_after: bool = False):
"""Use this to type the content into the field with id."""
if press_enter_after:
content += "\n"
await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
await type_text(self.page, content)
return await self._wait_page()
async def hover(self, element_id: int):
"""Hover over an element with id."""
await hover_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
return await self._wait_page()
async def press(self, key_comb: str):
"""Simulates the pressing of a key combination on the keyboard (e.g., Ctrl+v)."""
await key_press(self.page, key_comb)
return await self._wait_page()
async def scroll(self, direction: Literal["down", "up"]):
"""Scroll the page up or down."""
await scroll_page(self.page, direction)
return await self._wait_page()
async def goto(self, url: str, timeout: float = 30000):
"""Navigate to a specific URL."""
async with self.reporter as reporter:
page = await self.browser.new_page()
await reporter.async_report(url, "url")
await page.goto(url, timeout=timeout)
self.pages[url] = page
await self._set_current_page(page, url)
await reporter.async_report(page, "page")
await self.page.goto(url, timeout=timeout)
self.is_empty_page = False
return await self._wait_page()
async def view_page_element_to_scrape(self, requirement: str, keep_links: bool = False) -> None:
"""view the HTML content of current page to understand the structure. When executed, the content will be printed out
async def go_back(self):
"""Navigate to the previously viewed page."""
await self.page.go_back()
return await self._wait_page()
Args:
requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements.
keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required
"""
html = await self.current_page.content()
html = simplify_html(html, url=self.current_page.url, keep_links=keep_links)
mem_fs = MemoryFileSystem()
filename = f"{uuid4().hex}.html"
with mem_fs.open(filename, "w") as f:
f.write(html)
async def go_forward(self):
"""Navigate to the next page (if a previous 'go_back' action was performed)."""
await self.page.go_forward()
return await self._wait_page()
# Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback.
with contextlib.suppress(Exception):
from metagpt.rag.engines import SimpleEngine # avoid circular import
async def tab_focus(self, page_number: int):
"""Open a new, empty browser tab."""
page = self.browser_ctx.pages[page_number]
await page.bring_to_front()
return await self._wait_page()
# TODO make `from_docs` asynchronous
engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs)
nodes = await engine.aretrieve(requirement)
html = "\n".join(i.text for i in nodes)
mem_fs.rm_file(filename)
print(html)
async def get_page_content(self) -> str:
"""Get the HTML content of current page."""
html = await self.current_page.content()
html_content = html.strip()
return html_content
async def switch_page(self, url: str):
"""switch to an opened page in the browser and view the page"""
if url in self.pages:
await self._set_current_page(self.pages[url], url)
await self.reporter.async_report(self.current_page, "page")
async def close_tab(self):
"""Close the currently active tab."""
await self.page.close()
if len(self.browser_ctx.pages) > 0:
self.page = self.browser_ctx.pages[-1]
else:
print(f"Page not found: {url}")
self.page = await self.browser_ctx.new_page()
self.is_empty_page = True
return await self._wait_page()
async def _view_page_html(self, keep_len: int = 5000) -> str:
"""view the HTML content of current page, return the HTML content as a string. When executed, the content will be printed out"""
html = await self.current_page.content()
html_content = html.strip()[:keep_len]
return html_content
async def _wait_page(self):
page = self.page
await self._wait_until_page_idle(page)
self.accessibility_tree = await get_accessibility_tree(page)
await self.reporter.async_report(page, "page")
return f"SUCCESS, URL: {page.url}"
async def search_content_all(self, search_term: str) -> list[dict]:
"""search all occurences of search term in the current page and return the search results with their position.
Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it.
def _register_page_event(self, page: Page):
page.last_busy_time = time.time()
page.requests = set()
page.on("domcontentloaded", self._update_page_last_busy_time)
page.on("load", self._update_page_last_busy_time)
page.on("request", self._on_page_request)
page.on("requestfailed", self._on_page_requestfinished)
page.on("requestfinished", self._on_page_requestfinished)
page.on("frameattached", self._on_frame_change)
page.on("framenavigated", self._on_frame_change)
Args:
search_term (str): the search term
async def _wait_until_page_idle(self, page) -> None:
if not hasattr(page, "last_busy_time"):
self._register_page_event(page)
else:
page.last_busy_time = time.time()
while time.time() - page.last_busy_time < 0.5:
await page.wait_for_timeout(100)
Returns:
list[dict]: a list of dictionaries containing the elements and their positions, e.g.
[
{
"index": ...,
"content": {
"text_block": ...,
"links": [
{"text": ..., "href": ...},
...
]
},
"position": {from_top: ..., from_left: ...},
},
...
]
"""
locator = self.current_page.locator(f"text={search_term}")
count = await locator.count()
search_results = []
for i in range(count):
element = locator.nth(i)
if await element.is_visible():
position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })")
async def _update_page_last_busy_time(self, page: Page):
page.last_busy_time = time.time()
# Retrieve the surrounding block of text and links with their text
content = await element.evaluate(SEARCH_CONTENT_JS)
async def _on_page_request(self, request: Request):
page = request.frame.page
page.requests.add(request)
await self._update_page_last_busy_time(page)
search_results.append(
{"index": len(search_results), "content": content, "position": position, "element_obj": element}
)
async def _on_page_requestfinished(self, request: Request):
request.frame.page.requests.discard(request)
print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}")
async def _on_frame_change(self, frame: Frame):
await self._update_page_last_busy_time(frame.page)
return search_results
async def view(self):
observation = parse_accessibility_tree(self.accessibility_tree)
return f"Current Browser Viewer\n URL: {self.page.url}\nOBSERVATION:\n{observation[0]}\n"
async def scroll_to_search_result(self, search_results: list[dict], index: int = 0):
"""Scroll to the index-th search result, potentially for subsequent perception.
Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content.
async def __aenter__(self):
await self.start()
return self
Args:
search_results (list[dict]): search_results from search_content_all
index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0.
"""
if not search_results:
return {}
if index >= len(search_results):
print(f"Index {index} is out of range. Scrolling to the last instance.")
index = len(search_results) - 1
element = search_results[index]["element_obj"]
await element.scroll_into_view_if_needed()
await self.reporter.async_report(self.current_page, "page")
print(f"Successfully scrolled to the {index}-th search result")
print(await self._view())
# async def find_links(self) -> list:
# """Finds all links in the current page and returns a list of dictionaries with link text and the URL.
# Useful for navigating to more pages and exploring more resources.
# Returns:
# list: A list of dictionaries, each containing 'text' and 'href' keys.
# """
# # Use a CSS selector to find all <a> elements in the page.
# links = await self.current_page.query_selector_all("a")
# # Prepare an empty list to hold link information.
# link_info = []
# # Iterate over each link element to extract its text and href attributes.
# for link in links:
# text = await link.text_content()
# href = await link.get_attribute("href")
# link_info.append({"text": text, "href": href})
# print(f"Found {len(link_info)} links:\n\n{link_info}")
# return link_info
async def screenshot(self, path: str = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png"):
"""Take a screenshot of the current page and save it to the specified path."""
await self.current_page.screenshot(path=path)
print(f"Screenshot saved to: {path}")
async def _view(self, keep_len: int = 5000) -> str:
"""simulate human viewing the current page, return the visible text with links"""
visible_text_with_links = await self.current_page.evaluate(VIEW_CONTENT_JS)
print("The visible text and their links (if any): ", visible_text_with_links[:keep_len])
# html_content = await self._view_page_html(keep_len=keep_len)
# print("The html content: ", html_content)
async def scroll_current_page(self, offset: int = 500):
"""scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling"""
await self.current_page.evaluate(f"window.scrollBy(0, {offset})")
await self.reporter.async_report(self.current_page, "page")
print(f"Scrolled current page by {offset} pixels.")
print(await self._view())
def check_all_pages(self) -> dict:
"""return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state"""
pages_info = {url: page.title() for url, page in self.pages.items()}
return pages_info
async def close(self):
"""close the browser and all pages"""
await self.browser.close()
await self.playwright.stop()
async def get_scroll_position(page):
return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })")
SEARCH_CONTENT_JS = """
(element) => {
// const block = element.closest('p, div, section, article');
const block = element.parentElement;
return {
text_block: block.innerText,
// Create an array of objects, each containing the text and href of a link
links: Array.from(block.querySelectorAll('a')).map(a => ({
text: a.innerText,
href: a.href
}))
};
}
"""
VIEW_CONTENT_JS = """
() => {
return Array.from(document.querySelectorAll('body *')).filter(el => {
if (!(el.offsetWidth || el.offsetHeight || el.getClientRects().length)) return false;
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility !== 'visible' || style.opacity === '0') return false;
const rect = el.getBoundingClientRect();
const elemCenter = {
x: rect.left + rect.width / 2,
y: rect.top + rect.height / 2
};
if (elemCenter.x < 0 || elemCenter.y < 0 || elemCenter.x > window.innerWidth || elemCenter.y > window.innerHeight) return false;
if (document.elementFromPoint(elemCenter.x, elemCenter.y) !== el) return false;
return true;
}).map(el => {
let text = el.innerText || '';
text = text.trim();
if (!text.length) return '';
const parentAnchor = el.closest('a');
if (parentAnchor && parentAnchor.href) {
return `${text} (${parentAnchor.href})`;
}
return text;
}).filter(text => text.length > 0).join("\\n");
}
"""
async def __aexit__(self, *args, **kwargs):
await self.stop()

90
metagpt/tools/libs/cr.py Normal file
View file

@ -0,0 +1,90 @@
import json
from pathlib import Path
from typing import Optional
import aiofiles
from unidiff import PatchSet
import metagpt.ext.cr
from metagpt.ext.cr.actions.code_review import CodeReview as CodeReview_
from metagpt.ext.cr.actions.modify_code import ModifyCode
from metagpt.ext.cr.utils.schema import Point
from metagpt.tools.libs.browser import Browser
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import EditorReporter
@register_tool(tags=["codereview"], include_functions=["review", "fix"])
class CodeReview:
"""Review and fix the patch content from the pull request URL or a file."""
async def review(
self,
patch_path: str,
cr_output_file: str,
cr_point_file: Optional[str] = None,
) -> str:
"""Review a PR and save code review comments.
Args:
patch_path: The local path of the patch file or the url of the pull request. Example: "/data/xxx-pr-1.patch", "https://github.com/xx/XX/pull/1362"
cr_output_file: Output file path where code review comments will be saved. Example: "cr/xxx-pr-1.json"
cr_point_file: File path for specifying code review points. Defaults to a predefined file.
"""
patch = await self._get_patch_content(patch_path)
cr_point_file = cr_point_file if cr_point_file else Path(metagpt.ext.cr.__file__).parent / "points.json"
async with aiofiles.open(cr_point_file, "rb") as f:
cr_point_content = await f.read()
cr_points = [Point(**i) for i in json.loads(cr_point_content)]
async with EditorReporter(enable_llm_stream=True) as reporter:
src_path = cr_output_file
cr_output_path = Path(cr_output_file)
await reporter.async_report(
{"type": "CodeReview", "src_path": src_path, "filename": cr_output_path.name}, "meta"
)
comments = await CodeReview_().run(patch, cr_points)
cr_output_path.parent.mkdir(exist_ok=True, parents=True)
async with aiofiles.open(cr_output_path, "w") as f:
await f.write(json.dumps(comments, ensure_ascii=False))
await reporter.async_report(cr_output_path)
return f"The number of defects: {len(comments)} and the comments are stored in {cr_output_file}"
async def fix(
self,
patch_path: str,
cr_file: str,
output_dir: str,
) -> str:
"""Fix the patch content based on code review comments.
Args:
patch_path: The local path of the patch file or the url of the pull request.
cr_file: File path where code review comments are stored.
output_dir: File path where code review comments are stored.
"""
patch = await self._get_patch_content(patch_path)
async with aiofiles.open(cr_file, "r") as f:
comments = json.loads(await f.read())
await ModifyCode(pr="").run(patch, comments, output_dir)
return f"The fixed patch files store in {output_dir}"
async def _get_patch_content(self, patch_path):
if patch_path.startswith(("https://", "http://")):
# async with aiohttp.ClientSession(trust_env=True) as client:
# async with client.get(f"{patch_path}.diff", ) as resp:
# patch_file_content = await resp.text()
browser = Browser()
browser.proxy = {"server": "http://127.0.0.1:20172"}
async with browser:
await browser.goto(f"{patch_path}.diff")
patch_file_content = await browser.page.content()
else:
async with aiofiles.open(patch_path) as f:
patch_file_content = await f.read()
await EditorReporter().async_report(patch_path)
patch: PatchSet = PatchSet(patch_file_content)
return patch

View file

@ -2,10 +2,28 @@
# -*- coding: utf-8 -*-
from __future__ import annotations
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from metagpt.const import ASSISTANT_ALIAS
from metagpt.logs import ToolLogItem, log_tool_output
from metagpt.actions.requirement_analysis.framework import (
EvaluateFramework,
WriteFramework,
save_framework,
)
from metagpt.actions.requirement_analysis.trd import (
CompressExternalInterfaces,
DetectInteraction,
EvaluateTRD,
WriteTRD,
)
from metagpt.const import ASSISTANT_ALIAS, DEFAULT_WORKSPACE_ROOT, TEST_DATA_PATH
from metagpt.context import Context
from metagpt.logs import ToolLogItem, log_tool_output, logger
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import aread
from metagpt.utils.cost_manager import CostManager
async def import_git_repo(url: str) -> Path:
@ -42,3 +60,201 @@ async def import_git_repo(url: str) -> Path:
log_tool_output(output=outputs, tool_name=import_git_repo.__name__)
return ctx.repo.workdir
async def extract_external_interfaces(acknowledge: str) -> str:
"""
Extracts and compresses information about external system interfaces from a given acknowledgement text.
Args:
acknowledge (str): A natural text of acknowledgement containing details about external system interfaces.
Returns:
str: A compressed version of the information about external system interfaces.
Example:
>>> acknowledge = "## Interfaces\\n..."
>>> external_interfaces = await extract_external_interfaces(acknowledge=acknowledge)
>>> print(external_interfaces)
```json\n[\n{\n"id": 1,\n"inputs": {...
"""
compress_acknowledge = CompressExternalInterfaces()
return await compress_acknowledge.run(acknowledge=acknowledge)
async def mock_asearch_acknowledgement(use_case_actors: str):
return await aread(filename=TEST_DATA_PATH / "requirements/1.acknowledge.md")
@register_tool(tags=["system design", "write trd", "Write a TRD"])
async def write_trd(
use_case_actors: str,
user_requirements: str,
investment: float = 10,
context: Optional[Context] = None,
) -> str:
"""
Handles the writing of a Technical Requirements Document (TRD) based on user requirements.
Args:
user_requirements (str): The new/incremental user requirements.
use_case_actors (str): Description of the actors involved in the use case.
investment (float): Budget. Automatically stops optimizing TRD when the budget is overdrawn.
context (Context, optional): The context configuration. Default is None.
Returns:
str: The newly created TRD.
Example:
>>> # Given a new user requirements, write out a new TRD.
>>> user_requirements = "Write a 'snake game' TRD."
>>> use_case_actors = "- Actor: game player;\\n- System: snake game; \\n- External System: game center;"
>>> investment = 10.0
>>> trd = await write_trd(
>>> user_requirements=user_requirements,
>>> use_case_actors=use_case_actors,
>>> investment=investment,
>>> )
>>> print(trd)
## Technical Requirements Document\n ...
"""
context = context or Context(cost_manager=CostManager(max_budget=investment))
compress_acknowledge = CompressExternalInterfaces()
acknowledgement = await mock_asearch_acknowledgement(use_case_actors) # Replaced by acknowledgement_repo later.
external_interfaces = await compress_acknowledge.run(acknowledge=acknowledgement)
detect_interaction = DetectInteraction(context=context)
w_trd = WriteTRD(context=context)
evaluate_trd = EvaluateTRD(context=context)
is_pass = False
evaluation_conclusion = ""
interaction_events = ""
trd = ""
while not is_pass and (context.cost_manager.total_cost < context.cost_manager.max_budget):
interaction_events = await detect_interaction.run(
user_requirements=user_requirements,
use_case_actors=use_case_actors,
legacy_interaction_events=interaction_events,
evaluation_conclusion=evaluation_conclusion,
)
trd = await w_trd.run(
user_requirements=user_requirements,
use_case_actors=use_case_actors,
available_external_interfaces=external_interfaces,
evaluation_conclusion=evaluation_conclusion,
interaction_events=interaction_events,
previous_version_trd=trd,
)
evaluation = await evaluate_trd.run(
user_requirements=user_requirements,
use_case_actors=use_case_actors,
trd=trd,
interaction_events=interaction_events,
)
is_pass = evaluation.is_pass
evaluation_conclusion = evaluation.conclusion
return trd
@register_tool(tags=["system design", "write software framework", "Write a software framework based on a TRD"])
async def write_framework(
use_case_actors: str,
trd: str,
additional_technical_requirements: str,
output_dir: Optional[str] = "",
investment: float = 20.0,
context: Optional[Context] = None,
max_loop: int = 20,
) -> str:
"""
Run the action to generate a software framework based on the provided TRD and related information.
Args:
use_case_actors (str): Description of the use case actors involved.
trd (str): Technical Requirements Document detailing the requirements.
additional_technical_requirements (str): Any additional technical requirements.
output_dir (str, optional): Path to save the software framework files. Default is en empty string.
investment (float): Budget. Automatically stops optimizing TRD when the budget is overdrawn.
context (Context, optional): The context configuration. Default is None.
max_loop(int, optional): Acts as a safety exit valve when cost statistics fail. Default is 20.
Returns:
str: The generated software framework as a string of pathnames.
Example:
>>> use_case_actors = "- Actor: game player;\\n- System: snake game; \\n- External System: game center;"
>>> trd = "## TRD\\n..."
>>> additional_technical_requirements = "Using Java language, ..."
>>> investment = 15.0
>>> framework = await write_framework(
>>> use_case_actors=use_case_actors,
>>> trd=trd,
>>> additional_technical_requirements=constraint,
>>> investment=investment,
>>> )
>>> print(framework)
[{"path":"balabala", "filename":"...", ...
"""
context = context or Context(cost_manager=CostManager(max_budget=investment))
write_framework = WriteFramework(context=context)
evaluate_framework = EvaluateFramework(context=context)
is_pass = False
framework = ""
evaluation_conclusion = ""
acknowledgement = await mock_asearch_acknowledgement(use_case_actors) # Replaced by acknowledgement_repo later.
loop_count = 0
output_dir = (
Path(output_dir)
if output_dir
else DEFAULT_WORKSPACE_ROOT / (datetime.now().strftime("%Y%m%d%H%M%ST") + uuid.uuid4().hex[0:8])
)
file_list = []
while not is_pass and (context.cost_manager.total_cost < context.cost_manager.max_budget):
try:
framework = await write_framework.run(
use_case_actors=use_case_actors,
trd=trd,
acknowledge=acknowledgement,
legacy_output=framework,
evaluation_conclusion=evaluation_conclusion,
additional_technical_requirements=additional_technical_requirements,
)
except Exception as e:
logger.info(f"{e}")
break
evaluation = await evaluate_framework.run(
use_case_actors=use_case_actors,
trd=trd,
acknowledge=acknowledgement,
legacy_output=framework,
additional_technical_requirements=additional_technical_requirements,
)
is_pass = evaluation.is_pass
evaluation_conclusion = evaluation.conclusion
loop_count += 1
logger.info(f"Loop {loop_count}")
if context.cost_manager.total_cost < 1 and loop_count > max_loop:
break
file_list = await save_framework(dir_data=framework, trd=trd, output_dir=output_dir)
logger.info(f"Output:\n{file_list}")
return "## Software Framework" + "".join([f"\n- {i}" for i in file_list])
@register_tool(tags=["system design", "write trd and framework", "Write a TRD and the framework"])
async def write_trd_and_framework(
use_case_actors: str,
user_requirements: str,
additional_technical_requirements: str,
investment: float = 50.0,
output_dir: Optional[str] = "",
context: Optional[Context] = None,
) -> str:
context = context or Context(cost_manager=CostManager(max_budget=investment))
trd = await write_trd(use_case_actors=use_case_actors, user_requirements=user_requirements, context=context)
return await write_framework(
use_case_actors=use_case_actors,
trd=trd,
additional_technical_requirements=additional_technical_requirements,
output_dir=output_dir,
context=context,
)

View file

@ -26,7 +26,7 @@ class Terminal:
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
executable="/bin/bash"
executable="/bin/bash",
)
self.stdout_queue = Queue()
self.observer = TerminalReporter()

View file

@ -1,20 +1,50 @@
import contextlib
from uuid import uuid4
from metagpt.tools.libs.browser import Browser
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.web_browser_engine_playwright import PlaywrightWrapper
from metagpt.utils.file import MemoryFileSystem
from metagpt.utils.parse_html import simplify_html
@register_tool(tags=["web scraping", "web"])
async def scrape_web_playwright(url):
"""
Asynchronously Scrape and save the HTML structure and inner text content of a web page using Playwright.
@register_tool(tags=["web scraping"])
async def view_page_element_to_scrape(url: str, requirement: str, keep_links: bool = False) -> None:
"""view the HTML content of current page to understand the structure. When executed, the content will be printed out
Args:
url (str): The main URL to fetch inner text from.
Returns:
dict: The inner text content and html structure of the web page, keys are 'inner_text', 'html'.
url (str): The URL of the web page to scrape.
requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements.
keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required
"""
# Create a PlaywrightWrapper instance for the Chromium browser
web = await PlaywrightWrapper().run(url)
async with Browser() as browser:
await browser.goto(url)
page = browser.page
html = await page.content()
html = simplify_html(html, url=page.url, keep_links=keep_links)
mem_fs = MemoryFileSystem()
filename = f"{uuid4().hex}.html"
with mem_fs.open(filename, "w") as f:
f.write(html)
# Return the inner text content of the web page
return {"inner_text": web.inner_text.strip(), "html": web.html.strip()}
# Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback.
with contextlib.suppress(Exception):
from metagpt.rag.engines import SimpleEngine # avoid circular import
# TODO make `from_docs` asynchronous
engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs)
nodes = await engine.aretrieve(requirement)
html = "\n".join(i.text for i in nodes)
mem_fs.rm_file(filename)
print(html)
# async def get_elements_outerhtml(self, element_ids: list[int]):
# """Inspect the outer HTML of the elements in Current Browser Viewer.
# """
# page = self.page
# data = []
# for element_id in element_ids:
# html = await get_element_outer_html(page, get_backend_node_id(element_id, self.accessibility_tree))
# data.append(html)
# return "\n".join(f"[{element_id}]. {html}" for element_id, html in zip(element_ids, data))