feat: gitlab:mgx_ops

This commit is contained in:
莘权 马 2024-06-28 11:53:20 +08:00
commit 5f55590a57
17 changed files with 820 additions and 258 deletions

View file

@ -6,9 +6,7 @@
"""
from metagpt.roles.di.data_interpreter import DataInterpreter
__import__("metagpt.tools.libs.browser", fromlist=["Browser"]) # To skip pre-commit check
from metagpt.tools.libs.web_scraping import view_page_element_to_scrape
PAPER_LIST_REQ = """"
Get data from `paperlist` table in https://papercopilot.com/statistics/iclr-statistics/iclr-2024-statistics/,
@ -34,7 +32,7 @@ NEWS_36KR_REQ = """从36kr创投平台https://pitchhub.36kr.com/financing-flash
async def main():
di = DataInterpreter(tools=["Browser"])
di = DataInterpreter(tools=[view_page_element_to_scrape.__name__])
await di.run(ECOMMERCE_REQ)

View file

@ -7,6 +7,9 @@
@Modified By: mashenquan, 2023/11/27. Following the think-act principle, solidify the task parameters when creating the
WriteCode object, rather than passing them in when calling the run function.
"""
import asyncio
import os
from pathlib import Path
from typing import Optional
from pydantic import BaseModel, Field
@ -16,7 +19,8 @@ from metagpt.actions import WriteCode
from metagpt.actions.action import Action
from metagpt.logs import logger
from metagpt.schema import CodingContext, Document
from metagpt.utils.common import CodeParser
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import CodeParser, aread, awrite
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.report import EditorReporter
@ -205,3 +209,95 @@ class WriteCodeReview(Action):
# 如果rewrited_code是None原code perfect那么直接返回code
self.i_context.code_doc.content = iterative_code
return self.i_context
@register_tool(include_functions=["run"])
class ReviewAndRewriteCode(Action):
"""According to the design and task documents, review the code to ensure it is complete and correct."""
name: str = "ReviewAndRewriteCode"
async def run(
self,
code_path: str,
system_design_input: str = "",
project_schedule_input: str = "",
code_review_k_times: int = 2,
) -> str:
"""Reviews the provided code based on the accompanying system design and project schedule documentation, return the complete and correct code.
Read the code from `code_path`, and write the final code to `code_path`.
If both `system_design_input` and `project_schedule_input are absent`, it will return and do nothing.
Args:
code_path (str): The file path of the code snippet to be reviewed. This should be a string containing the path to the source code file.
system_design_input (str): Content or file path of the design document associated with the code. This should describe the system architecture, used in the code. It helps provide context for the review process.
project_schedule_input (str): Content or file path of the task document describing what the code is intended to accomplish. This should outline the functional requirements or objectives of the code.
code_review_k_times (int, optional): The number of iterations for reviewing and potentially rewriting the code. Defaults to 2.
Returns:
str: The potentially corrected or approved code after review.
Example Usage:
# Example of how to call the run method with a code snippet and documentation
await ReviewAndRewriteCode().run(
code_path="/tmp/game.js",
system_design_input="/tmp/system_design.json",
project_schedule_input="/tmp/project_task_list.json"
)
"""
if not system_design_input and not project_schedule_input:
logger.info(
"Both `system_design_input` and `project_schedule_input` are absent, ReviewAndRewriteCode will do nothing."
)
return
code, design_doc, task_doc = await asyncio.gather(
aread(code_path), self._try_aread(system_design_input), self._try_aread(project_schedule_input)
)
code_doc = self._create_code_doc(code_path=code_path, code=code)
review_action = WriteCodeReview(i_context=CodingContext(filename=code_doc.filename))
context = "\n".join(
[
"## System Design\n" + design_doc + "\n",
"## Task\n" + task_doc + "\n",
]
)
for i in range(code_review_k_times):
context_prompt = PROMPT_TEMPLATE.format(context=context, code=code, filename=code_path)
cr_prompt = EXAMPLE_AND_INSTRUCTION.format(
format_example=FORMAT_EXAMPLE.format(filename=code_path),
)
logger.info(f"The {i+1}th time to CodeReview: {code_path}.")
result, rewrited_code = await review_action.write_code_review_and_rewrite(
context_prompt, cr_prompt, doc=code_doc
)
if "LBTM" in result:
code = rewrited_code
elif "LGTM" in result:
break
await awrite(filename=code_path, data=code)
return code
@staticmethod
async def _try_aread(input: str) -> str:
"""Try to read from the path if it's a file; return input directly if not."""
if os.path.exists(input):
return await aread(input)
return input
@staticmethod
def _create_code_doc(code_path: str, code: str) -> Document:
"""Create a Document to represent the code doc."""
path = Path(code_path)
return Document(root_path=str(path.parent), filename=path.name, content=code)

View file

@ -31,6 +31,7 @@ class LLMType(Enum):
MOONSHOT = "moonshot"
MISTRAL = "mistral"
YI = "yi" # lingyiwanwu
OPEN_ROUTER = "open_router"
def __missing__(self, key):
return self.OPENAI

View file

@ -4,6 +4,14 @@ EXTRA_INSTRUCTION = """
4. Each time you write a code in your response, write with the Editor directly without preparing a repetitive code block beforehand.
5. Take on ONE task and write ONE code file in each response. DON'T attempt all tasks in one response.
6. When not specified, you should write files in a folder named "src". If you know the project path, then write in a "src" folder under the project path.
7. When provided system design or project schedule, read them first, then adhere to them in your implementation.
7. When provided system design or project schedule, you MUST read them first before making a plan, then adhere to them in your implementation, especially in the programming language, package, or framework. You MUST implement all code files prescribed in the system design or project schedule. You can create a plan first with each task corresponding to implementing one code file.
8. Write at most one file per task, do your best to implement THE ONLY ONE FILE. CAREFULLY CHECK THAT YOU DONT MISS ANY NECESSARY CLASS/FUNCTION IN THIS FILE.
9. COMPLETE CODE: Your code will be part of the entire project, so please implement complete, reliable, reusable code snippets.
10. When provided system design, YOU MUST FOLLOW "Data structures and interfaces". DONT CHANGE ANY DESIGN. Do not use public member functions that do not exist in your design.
11. Write out EVERY CODE DETAIL, DON'T LEAVE TODO.
12. To modify code in a file, read the entire file, make changes, and update the file with the complete code, ensuring that no line numbers are included in the final write.
13. When a system design or project schedule is provided, at the end of the plan, add a CodeRview Task for each file; for example, if there are three files, add three CodeRview Tasks. For each CodeRview Task, just call ReviewAndRewriteCode.run.
"""
ENGINEER2_INSTRUCTION = ROLE_INSTRUCTION + EXTRA_INSTRUCTION.strip()

View file

@ -50,3 +50,14 @@ Some text indicating your thoughts, such as how you should update the plan statu
]
```
"""
JSON_REPAIR_PROMPT = """
## json data
{json_data}
## Output Format
```json
Formatted JSON data
```
Help check if there are any formatting issues with the JSON data? If so, please help format it
"""

View file

@ -20,7 +20,7 @@ Note:
3. If the requirement contains both DATA-RELATED part mentioned in 1 and software development part mentioned in 2, you should decompose the software development part and assign them to different team members based on their expertise, and assign the DATA-RELATED part to Data Analyst David directly.
4. If the requirement is a common-sense, logical, or math problem, you should respond directly without assigning any task to team members.
5. If you think the requirement is not clear or ambiguous, you should ask the user for clarification immediately. Assign tasks only after all info is clear.
6. It is helpful for Engineer to have both the system design and the project schedule for writing the code, so include paths of both files (if available) when publishing message to Engineer.
6. It is helpful for Engineer to have both the system design and the project schedule for writing the code, so include paths of both files (if available) and remind Engineer to definitely read them when publishing message to Engineer.
7. If the requirement is writing a TRD and software framework, you should assign it to Architect. When publishing message to Architect, you should directly copy the full original user requirement.
"""

View file

@ -40,7 +40,8 @@ from metagpt.utils.token_counter import (
)
@register_provider([LLMType.OPENAI, LLMType.FIREWORKS, LLMType.OPEN_LLM, LLMType.MOONSHOT, LLMType.MISTRAL, LLMType.YI])
@register_provider([LLMType.OPENAI, LLMType.FIREWORKS, LLMType.OPEN_LLM, LLMType.MOONSHOT, LLMType.MISTRAL, LLMType.YI,
LLMType.OPEN_ROUTER])
class OpenAILLM(BaseLLM):
"""Check https://platform.openai.com/examples for examples"""

View file

@ -4,6 +4,7 @@ import json
import os
from typing import Any, Optional, Union
import fsspec
from llama_index.core import SimpleDirectoryReader
from llama_index.core.callbacks.base import CallbackManager
from llama_index.core.embeddings import BaseEmbedding
@ -83,6 +84,7 @@ class SimpleEngine(RetrieverQueryEngine):
llm: LLM = None,
retriever_configs: list[BaseRetrieverConfig] = None,
ranker_configs: list[BaseRankerConfig] = None,
fs: Optional[fsspec.AbstractFileSystem] = None,
) -> "SimpleEngine":
"""From docs.
@ -96,11 +98,12 @@ class SimpleEngine(RetrieverQueryEngine):
llm: Must supported by llama index. Default OpenAI.
retriever_configs: Configuration for retrievers. If more than one config, will use SimpleHybridRetriever.
ranker_configs: Configuration for rankers.
fs: File system to use.
"""
if not input_dir and not input_files:
raise ValueError("Must provide either `input_dir` or `input_files`.")
documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files).load_data()
documents = SimpleDirectoryReader(input_dir=input_dir, input_files=input_files, fs=fs).load_data()
cls._fix_document_metadata(documents)
transformations = transformations or cls._default_transformations()

View file

@ -57,5 +57,6 @@ class Architect(RoleZero):
self.tool_execution_map.update(
{
write_trd_and_framework.__name__: write_trd_and_framework,
"run": write_design.run, # alias
}
)

View file

@ -83,7 +83,7 @@ class DataAnalyst(DataInterpreter):
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.commands = json.loads(CodeParser.parse_code(block=None, lang='json', text=rsp))
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
await run_commands(self, self.commands, self.rc.working_memory)

View file

@ -1,7 +1,9 @@
from __future__ import annotations
from metagpt.actions.write_code_review import ReviewAndRewriteCode
from metagpt.prompts.di.engineer2 import ENGINEER2_INSTRUCTION
from metagpt.roles.di.role_zero import RoleZero
from metagpt.strategy.experience_retriever import ENGINEER_EXAMPLE
class Engineer2(RoleZero):
@ -10,4 +12,17 @@ class Engineer2(RoleZero):
goal: str = "Take on game, app, and web development"
instruction: str = ENGINEER2_INSTRUCTION
tools: str = ["Plan", "Editor:write,read,write_content", "RoleZero"]
tools: str = ["Plan", "Editor:write,read", "RoleZero", "ReviewAndRewriteCode"]
def _update_tool_execution(self):
review = ReviewAndRewriteCode()
self.tool_execution_map.update(
{
"ReviewAndRewriteCode.run": review.run,
"ReviewAndRewriteCode": review.run,
}
)
def _retrieve_experience(self) -> str:
return ENGINEER_EXAMPLE

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import inspect
import json
import re
import traceback
from typing import Callable, Literal, Tuple
@ -10,7 +11,7 @@ from pydantic import model_validator
from metagpt.actions import Action
from metagpt.actions.di.run_command import RunCommand
from metagpt.logs import logger
from metagpt.prompts.di.role_zero import CMD_PROMPT, ROLE_INSTRUCTION
from metagpt.prompts.di.role_zero import CMD_PROMPT, ROLE_INSTRUCTION, JSON_REPAIR_PROMPT
from metagpt.roles import Role
from metagpt.schema import AIMessage, Message, UserMessage
from metagpt.strategy.experience_retriever import DummyExpRetriever, ExpRetriever
@ -21,6 +22,7 @@ from metagpt.tools.tool_recommend import BM25ToolRecommender, ToolRecommender
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import CodeParser
from metagpt.utils.report import ThoughtReporter
from metagpt.utils.repair_llm_raw_output import repair_llm_raw_output, RepairType
@register_tool(include_functions=["ask_human", "reply_to_human"])
@ -87,6 +89,23 @@ class RoleZero(Role):
"RoleZero.ask_human": self.ask_human,
"RoleZero.reply_to_human": self.reply_to_human,
}
self.tool_execution_map.update(
{
f"Browser.{i}": getattr(self.browser, i)
for i in [
"click",
"close_tab",
"go_back",
"go_forward",
"goto",
"hover",
"press",
"scroll",
"tab_focus",
"type",
]
}
)
# can be updated by subclass
self._update_tool_execution()
return self
@ -125,7 +144,14 @@ class RoleZero(Role):
available_commands=tool_info,
instruction=self.instruction.strip(),
)
context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)])
memory = self.rc.memory.get(self.memory_k)
if not self.browser.is_empty_page:
pattern = re.compile(r"Command Browser\.(\w+) executed")
for index, msg in zip(range(len(memory), 0, -1), memory[::-1]):
if pattern.match(msg.content):
memory.insert(index, UserMessage(cause_by="browser", content=await self.browser.view()))
break
context = self.llm.format_msg(memory + [UserMessage(content=prompt)])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter(enable_llm_stream=True):
self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg)
@ -138,13 +164,22 @@ class RoleZero(Role):
return await super()._act()
try:
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=self.command_rsp))
commands = CodeParser.parse_code(block=None, lang="json", text=self.command_rsp)
commands = json.loads(repair_llm_raw_output(output=commands, req_keys=[None], repair_type=RepairType.JSON))
except json.JSONDecodeError as e:
commands = await self.llm.aask(msg=JSON_REPAIR_PROMPT.format(json_data=self.command_rsp))
commands = json.loads(CodeParser.parse_code(block=None, lang="json", text=commands))
except Exception as e:
tb = traceback.format_exc()
print(tb)
error_msg = UserMessage(content=str(e))
self.rc.memory.add(error_msg)
return error_msg
# 为了对LLM不按格式生成进行容错
if isinstance(commands, dict):
commands = commands["commands"] if "commands" in commands else [commands]
outputs = await self._run_commands(commands)
self.rc.memory.add(UserMessage(content=outputs))
return AIMessage(

View file

@ -787,3 +787,115 @@ editor.read(path="./main.py")
- If no specific file is provided, search the symbol in the whole codebase to locate the issue.
- If no specific symbol is provided, directly open and read the file to diagnose the problem.
"""
ENGINEER_EXAMPLE = """
## example 1
User Requirement: Please implement the core game logic for the 2048 game, including tile movements, merging logic, score tracking, and keyboard interaction. Refer to the project schedule located at '/tmp/project_schedule.json' and the system design document at '/tmp/system_design.json' for detailed information.
Explanation: I will first need to read the system design document and the project schedule to understand the specific requirements and architecture outlined for the game development.
```json
[
{
"command_name": "Editor.read",
"args": {
"path": "/tmp/docs/project_schedule.json"
}
},
{
"command_name": "Editor.read",
"args": {
"path": "/tmp/docs/system_design.json"
}
}
]
```
## example 2
To achieve the goal of writing a 2048 game using JavaScript and HTML without any frameworks, I will create a plan consisting of three tasks, each corresponding to the creation of one of the required files: `index.html`, `style.css`, and `script.js`. Following the completion of these tasks, I will add a code review task for each file to ensure the implementation aligns with the provided system design and project schedule documents.
Here's the plan:
1. **Task 1**: Create `index.html` - This file will contain the HTML structure necessary for the game's UI.
2. **Task 2**: Create `style.css` - This file will define the CSS styles to make the game visually appealing and responsive.
3. **Task 3**: Create `script.js` - This file will contain the JavaScript code for the game logic and UI interactions.
4. **Code Review Tasks**: Review each file to ensure they meet the project requirements and adhere to the system design.
Let's start by appending the first task to the plan.
```json
[
{
"command_name": "Plan.append_task",
"args": {
"task_id": "1",
"dependent_task_ids": [],
"instruction": "Create the index.html file with the basic HTML structure for the 2048 game.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "2",
"dependent_task_ids": ["1"],
"instruction": "Create the style.css file with the necessary CSS to style the 2048 game.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "3",
"dependent_task_ids": ["1", "2"],
"instruction": "Create the script.js file containing the JavaScript logic for the 2048 game.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "4",
"dependent_task_ids": ["1"],
"instruction": "Use ReviewAndRewriteCode to review the code in index.html to ensure it meets the design specifications.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "5",
"dependent_task_ids": ["2"],
"instruction": "Use ReviewAndRewriteCode to review the code in style.css to ensure it meets the design specifications.",
"assignee": "Alex"
}
},
{
"command_name": "Plan.append_task",
"args": {
"task_id": "6",
"dependent_task_ids": ["3"],
"instruction": "Use ReviewAndRewriteCode to review the code in script.js to ensure it meets the design specifications. ",
"assignee": "Alex"
}
}
]
```
## example 3
I will now review the code in `script.js`.
Explanation: to review the code, call ReviewAndRewriteCode.run.
```json
[
{
"command_name": "ReviewAndRewriteCode.run",
"args": {
"code_path": "/tmp/src/script.js",
"system_design_input": "/tmp/docs/system_design.json",
"project_schedule_input": "/tmp/docs/project_schedule.json",
"code_review_k_times": 2
}
}
]
```
"""

View file

@ -1,261 +1,206 @@
from __future__ import annotations
import contextlib
from uuid import uuid4
import time
from typing import Literal, Optional
from playwright.async_api import async_playwright
from playwright.async_api import Browser as Browser_
from playwright.async_api import (
BrowserContext,
Frame,
Page,
Playwright,
Request,
async_playwright,
)
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.file import MemoryFileSystem
from metagpt.utils.parse_html import simplify_html
from metagpt.utils.a11y_tree import (
click_element,
get_accessibility_tree,
get_backend_node_id,
hover_element,
key_press,
parse_accessibility_tree,
scroll_page,
type_text,
)
from metagpt.utils.report import BrowserReporter
@register_tool(tags=["web", "browse", "scrape"])
@register_tool(
tags=["web", "browse"],
include_functions=[
"click",
"close_tab",
"go_back",
"go_forward",
"goto",
"hover",
"press",
"scroll",
"tab_focus",
"type",
],
)
class Browser:
"""
A tool for browsing the web and scraping. Don't initialize a new instance of this class if one already exists.
Note: Combine searching and scrolling together to achieve most effective browsing. DON'T stick to one method.
"""A tool for browsing the web. Don't initialize a new instance of this class if one already exists.
Note: If you plan to use the browser to assist you in completing tasks, then using the browser should be a standalone
task, executing actions each time based on the content seen on the webpage before proceeding to the next step.
## Example
Issue: The details of the latest issue in the geekan/MetaGPT repository.
Plan: Use a browser to view the details of the latest issue in the geekan/MetaGPT repository.
Solution:
Let's first open the issue page of the MetaGPT repository with the `Browser.goto` command
>>> await browser.goto("https://github.com/geekan/MetaGPT/issues")
From the output webpage, we've identified that the latest issue can be accessed by clicking on the element with ID "1141".
>>> await browser.click(1141)
Finally, we have found the webpage for the latest issue, we can close the tab and finish current task.
>>> await browser.close_tab()
"""
def __init__(self):
"""initiate the browser, create pages placeholder later to be managed as {page_url: page object}"""
self.browser = None
# browser status management
self.pages = {}
self.current_page_url = None
self.current_page = None
self.playwright: Optional[Playwright] = None
self.browser_instance: Optional[Browser_] = None
self.browser_ctx: Optional[BrowserContext] = None
self.page: Optional[Page] = None
self.accessibility_tree: list = []
self.headless: bool = True
self.proxy = None
self.is_empty_page = True
self.reporter = BrowserReporter()
async def start(self):
async def start(self) -> None:
"""Starts Playwright and launches a browser"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch()
if self.playwright is None:
self.playwright = playwright = await async_playwright().start()
browser = self.browser_instance = await playwright.chromium.launch(headless=self.headless, proxy=self.proxy)
browser_ctx = self.browser_ctx = await browser.new_context()
self.page = await browser_ctx.new_page()
async def _set_current_page(self, page, url):
self.current_page = page
self.current_page_url = url
print("Now on page ", url)
await self._view()
async def stop(self):
if self.playwright:
playwright = self.playwright
self.playwright = None
self.browser_instance = None
self.browser_ctx = None
await playwright.stop()
async def open_new_page(self, url: str, timeout: float = 30000):
"""open a new page in the browser and view the page"""
async def click(self, element_id: int):
"""clicks on an element with a specific id on the webpage."""
await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
return await self._wait_page()
async def type(self, element_id: int, content: str, press_enter_after: bool = False):
"""Use this to type the content into the field with id."""
if press_enter_after:
content += "\n"
await click_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
await type_text(self.page, content)
return await self._wait_page()
async def hover(self, element_id: int):
"""Hover over an element with id."""
await hover_element(self.page, get_backend_node_id(element_id, self.accessibility_tree))
return await self._wait_page()
async def press(self, key_comb: str):
"""Simulates the pressing of a key combination on the keyboard (e.g., Ctrl+v)."""
await key_press(self.page, key_comb)
return await self._wait_page()
async def scroll(self, direction: Literal["down", "up"]):
"""Scroll the page up or down."""
await scroll_page(self.page, direction)
return await self._wait_page()
async def goto(self, url: str, timeout: float = 30000):
"""Navigate to a specific URL."""
async with self.reporter as reporter:
page = await self.browser.new_page()
await reporter.async_report(url, "url")
await page.goto(url, timeout=timeout)
self.pages[url] = page
await self._set_current_page(page, url)
await reporter.async_report(page, "page")
await self.page.goto(url, timeout=timeout)
self.is_empty_page = False
return await self._wait_page()
async def view_page_element_to_scrape(self, requirement: str, keep_links: bool = False) -> None:
"""view the HTML content of current page to understand the structure. When executed, the content will be printed out
async def go_back(self):
"""Navigate to the previously viewed page."""
await self.page.go_back()
return await self._wait_page()
Args:
requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements.
keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required
"""
html = await self.current_page.content()
html = simplify_html(html, url=self.current_page.url, keep_links=keep_links)
mem_fs = MemoryFileSystem()
filename = f"{uuid4().hex}.html"
with mem_fs.open(filename, "w") as f:
f.write(html)
async def go_forward(self):
"""Navigate to the next page (if a previous 'go_back' action was performed)."""
await self.page.go_forward()
return await self._wait_page()
# Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback.
with contextlib.suppress(Exception):
from metagpt.rag.engines import SimpleEngine # avoid circular import
async def tab_focus(self, page_number: int):
"""Open a new, empty browser tab."""
page = self.browser_ctx.pages[page_number]
await page.bring_to_front()
return await self._wait_page()
# TODO make `from_docs` asynchronous
engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs)
nodes = await engine.aretrieve(requirement)
html = "\n".join(i.text for i in nodes)
mem_fs.rm_file(filename)
print(html)
async def get_page_content(self) -> str:
"""Get the HTML content of current page."""
html = await self.current_page.content()
html_content = html.strip()
return html_content
async def switch_page(self, url: str):
"""switch to an opened page in the browser and view the page"""
if url in self.pages:
await self._set_current_page(self.pages[url], url)
await self.reporter.async_report(self.current_page, "page")
async def close_tab(self):
"""Close the currently active tab."""
await self.page.close()
if len(self.browser_ctx.pages) > 0:
self.page = self.browser_ctx.pages[-1]
else:
print(f"Page not found: {url}")
self.page = await self.browser_ctx.new_page()
self.is_empty_page = True
return await self._wait_page()
async def _view_page_html(self, keep_len: int = 5000) -> str:
"""view the HTML content of current page, return the HTML content as a string. When executed, the content will be printed out"""
html = await self.current_page.content()
html_content = html.strip()[:keep_len]
return html_content
async def _wait_page(self):
page = self.page
await self._wait_until_page_idle(page)
self.accessibility_tree = await get_accessibility_tree(page)
await self.reporter.async_report(page, "page")
return f"SUCCESS, URL: {page.url}"
async def search_content_all(self, search_term: str) -> list[dict]:
"""search all occurences of search term in the current page and return the search results with their position.
Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it.
def _register_page_event(self, page: Page):
page.last_busy_time = time.time()
page.requests = set()
page.on("domcontentloaded", self._update_page_last_busy_time)
page.on("load", self._update_page_last_busy_time)
page.on("request", self._on_page_request)
page.on("requestfailed", self._on_page_requestfinished)
page.on("requestfinished", self._on_page_requestfinished)
page.on("frameattached", self._on_frame_change)
page.on("framenavigated", self._on_frame_change)
Args:
search_term (str): the search term
async def _wait_until_page_idle(self, page) -> None:
if not hasattr(page, "last_busy_time"):
self._register_page_event(page)
else:
page.last_busy_time = time.time()
while time.time() - page.last_busy_time < 0.5:
await page.wait_for_timeout(100)
Returns:
list[dict]: a list of dictionaries containing the elements and their positions, e.g.
[
{
"index": ...,
"content": {
"text_block": ...,
"links": [
{"text": ..., "href": ...},
...
]
},
"position": {from_top: ..., from_left: ...},
},
...
]
"""
locator = self.current_page.locator(f"text={search_term}")
count = await locator.count()
search_results = []
for i in range(count):
element = locator.nth(i)
if await element.is_visible():
position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })")
async def _update_page_last_busy_time(self, page: Page):
page.last_busy_time = time.time()
# Retrieve the surrounding block of text and links with their text
content = await element.evaluate(SEARCH_CONTENT_JS)
async def _on_page_request(self, request: Request):
page = request.frame.page
page.requests.add(request)
await self._update_page_last_busy_time(page)
search_results.append(
{"index": len(search_results), "content": content, "position": position, "element_obj": element}
)
async def _on_page_requestfinished(self, request: Request):
request.frame.page.requests.discard(request)
print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}")
async def _on_frame_change(self, frame: Frame):
await self._update_page_last_busy_time(frame.page)
return search_results
async def view(self):
observation = parse_accessibility_tree(self.accessibility_tree)
return f"Current Browser Viewer\n URL: {self.page.url}\nOBSERVATION:\n{observation[0]}\n"
async def scroll_to_search_result(self, search_results: list[dict], index: int = 0):
"""Scroll to the index-th search result, potentially for subsequent perception.
Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content.
async def __aenter__(self):
await self.start()
return self
Args:
search_results (list[dict]): search_results from search_content_all
index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0.
"""
if not search_results:
return {}
if index >= len(search_results):
print(f"Index {index} is out of range. Scrolling to the last instance.")
index = len(search_results) - 1
element = search_results[index]["element_obj"]
await element.scroll_into_view_if_needed()
await self.reporter.async_report(self.current_page, "page")
print(f"Successfully scrolled to the {index}-th search result")
print(await self._view())
# async def find_links(self) -> list:
# """Finds all links in the current page and returns a list of dictionaries with link text and the URL.
# Useful for navigating to more pages and exploring more resources.
# Returns:
# list: A list of dictionaries, each containing 'text' and 'href' keys.
# """
# # Use a CSS selector to find all <a> elements in the page.
# links = await self.current_page.query_selector_all("a")
# # Prepare an empty list to hold link information.
# link_info = []
# # Iterate over each link element to extract its text and href attributes.
# for link in links:
# text = await link.text_content()
# href = await link.get_attribute("href")
# link_info.append({"text": text, "href": href})
# print(f"Found {len(link_info)} links:\n\n{link_info}")
# return link_info
async def screenshot(self, path: str = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png"):
"""Take a screenshot of the current page and save it to the specified path."""
await self.current_page.screenshot(path=path)
print(f"Screenshot saved to: {path}")
async def _view(self, keep_len: int = 5000) -> str:
"""simulate human viewing the current page, return the visible text with links"""
visible_text_with_links = await self.current_page.evaluate(VIEW_CONTENT_JS)
print("The visible text and their links (if any): ", visible_text_with_links[:keep_len])
# html_content = await self._view_page_html(keep_len=keep_len)
# print("The html content: ", html_content)
async def scroll_current_page(self, offset: int = 500):
"""scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling"""
await self.current_page.evaluate(f"window.scrollBy(0, {offset})")
await self.reporter.async_report(self.current_page, "page")
print(f"Scrolled current page by {offset} pixels.")
print(await self._view())
def check_all_pages(self) -> dict:
"""return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state"""
pages_info = {url: page.title() for url, page in self.pages.items()}
return pages_info
async def close(self):
"""close the browser and all pages"""
await self.browser.close()
await self.playwright.stop()
async def get_scroll_position(page):
return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })")
SEARCH_CONTENT_JS = """
(element) => {
// const block = element.closest('p, div, section, article');
const block = element.parentElement;
return {
text_block: block.innerText,
// Create an array of objects, each containing the text and href of a link
links: Array.from(block.querySelectorAll('a')).map(a => ({
text: a.innerText,
href: a.href
}))
};
}
"""
VIEW_CONTENT_JS = """
() => {
return Array.from(document.querySelectorAll('body *')).filter(el => {
if (!(el.offsetWidth || el.offsetHeight || el.getClientRects().length)) return false;
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility !== 'visible' || style.opacity === '0') return false;
const rect = el.getBoundingClientRect();
const elemCenter = {
x: rect.left + rect.width / 2,
y: rect.top + rect.height / 2
};
if (elemCenter.x < 0 || elemCenter.y < 0 || elemCenter.x > window.innerWidth || elemCenter.y > window.innerHeight) return false;
if (document.elementFromPoint(elemCenter.x, elemCenter.y) !== el) return false;
return true;
}).map(el => {
let text = el.innerText || '';
text = text.trim();
if (!text.length) return '';
const parentAnchor = el.closest('a');
if (parentAnchor && parentAnchor.href) {
return `${text} (${parentAnchor.href})`;
}
return text;
}).filter(text => text.length > 0).join("\\n");
}
"""
async def __aexit__(self, *args, **kwargs):
await self.stop()

View file

@ -1,20 +1,50 @@
import contextlib
from uuid import uuid4
from metagpt.tools.libs.browser import Browser
from metagpt.tools.tool_registry import register_tool
from metagpt.tools.web_browser_engine_playwright import PlaywrightWrapper
from metagpt.utils.file import MemoryFileSystem
from metagpt.utils.parse_html import simplify_html
@register_tool(tags=["web scraping", "web"])
async def scrape_web_playwright(url):
"""
Asynchronously Scrape and save the HTML structure and inner text content of a web page using Playwright.
@register_tool(tags=["web scraping"])
async def view_page_element_to_scrape(url: str, requirement: str, keep_links: bool = False) -> None:
"""view the HTML content of current page to understand the structure. When executed, the content will be printed out
Args:
url (str): The main URL to fetch inner text from.
Returns:
dict: The inner text content and html structure of the web page, keys are 'inner_text', 'html'.
url (str): The URL of the web page to scrape.
requirement (str): Providing a clear and detailed requirement helps in focusing the inspection on the desired elements.
keep_links (bool): Whether to keep the hyperlinks in the HTML content. Set to True if links are required
"""
# Create a PlaywrightWrapper instance for the Chromium browser
web = await PlaywrightWrapper().run(url)
async with Browser() as browser:
await browser.goto(url)
page = browser.page
html = await page.content()
html = simplify_html(html, url=page.url, keep_links=keep_links)
mem_fs = MemoryFileSystem()
filename = f"{uuid4().hex}.html"
with mem_fs.open(filename, "w") as f:
f.write(html)
# Return the inner text content of the web page
return {"inner_text": web.inner_text.strip(), "html": web.html.strip()}
# Since RAG is an optional optimization, if it fails, the simplified HTML can be used as a fallback.
with contextlib.suppress(Exception):
from metagpt.rag.engines import SimpleEngine # avoid circular import
# TODO make `from_docs` asynchronous
engine = SimpleEngine.from_docs(input_files=[filename], fs=mem_fs)
nodes = await engine.aretrieve(requirement)
html = "\n".join(i.text for i in nodes)
mem_fs.rm_file(filename)
print(html)
# async def get_elements_outerhtml(self, element_ids: list[int]):
# """Inspect the outer HTML of the elements in Current Browser Viewer.
# """
# page = self.page
# data = []
# for element_id in element_ids:
# html = await get_element_outer_html(page, get_backend_node_id(element_id, self.accessibility_tree))
# data.append(html)
# return "\n".join(f"[{element_id}]. {html}" for element_id, html in zip(element_ids, data))

306
metagpt/utils/a11y_tree.py Normal file
View file

@ -0,0 +1,306 @@
"""See https://github.com/web-arena-x/webarena
"""
from __future__ import annotations
import re
from playwright.async_api import BrowserContext, Page
async def get_accessibility_tree(page: Page):
cdp_session = await get_page_cdp_session(page)
resp = await cdp_session.send("Accessibility.getFullAXTree")
seen_ids = set()
accessibility_tree = []
for node in resp["nodes"]:
if node["nodeId"] not in seen_ids:
accessibility_tree.append(node)
seen_ids.add(node["nodeId"])
return accessibility_tree
async def execute_step(step: str, page: Page, browser_ctx: BrowserContext, accessibility_tree: list):
step = step.strip()
func = step.split("[")[0].strip() if "[" in step else step.split()[0].strip()
if func == "None":
return ""
elif func == "click":
match = re.search(r"click ?\[(\d+)\]", step)
if not match:
raise ValueError(f"Invalid click action {step}")
element_id = match.group(1)
await click_element(page, get_backend_node_id(element_id, accessibility_tree))
elif func == "hover":
match = re.search(r"hover ?\[(\d+)\]", step)
if not match:
raise ValueError(f"Invalid hover action {step}")
element_id = match.group(1)
await hover_element(page, get_backend_node_id(element_id, accessibility_tree))
elif func == "type":
# add default enter flag
if not (step.endswith("[0]") or step.endswith("[1]")):
step += " [1]"
match = re.search(r"type ?\[(\d+)\] ?\[(.+)\] ?\[(\d+)\]", step)
if not match:
raise ValueError(f"Invalid type action {step}")
element_id, text, enter_flag = (
match.group(1),
match.group(2),
match.group(3),
)
if enter_flag == "1":
text += "\n"
await click_element(page, get_backend_node_id(element_id, accessibility_tree))
await type_text(page, text)
elif func == "press":
match = re.search(r"press ?\[(.+)\]", step)
if not match:
raise ValueError(f"Invalid press action {step}")
key = match.group(1)
await key_press(page, key)
elif func == "scroll":
# up or down
match = re.search(r"scroll ?\[?(up|down)\]?", step)
if not match:
raise ValueError(f"Invalid scroll action {step}")
direction = match.group(1)
await scroll_page(page, direction)
elif func == "goto":
match = re.search(r"goto ?\[(.+)\]", step)
if not match:
raise ValueError(f"Invalid goto action {step}")
url = match.group(1)
await page.goto(url)
elif func == "new_tab":
page = await browser_ctx.new_page()
elif func == "go_back":
await page.go_back()
elif func == "go_forward":
await page.go_forward()
elif func == "tab_focus":
match = re.search(r"tab_focus ?\[(\d+)\]", step)
if not match:
raise ValueError(f"Invalid tab_focus action {step}")
page_number = int(match.group(1))
page = browser_ctx.pages[page_number]
await page.bring_to_front()
elif func == "close_tab":
await page.close()
if len(browser_ctx.pages) > 0:
page = browser_ctx.pages[-1]
else:
page = await browser_ctx.new_page()
elif func == "stop":
match = re.search(r'stop\(?"(.+)?"\)', step)
answer = match.group(1) if match else ""
return answer
else:
raise ValueError
await page.wait_for_load_state("domcontentloaded")
return page
async def type_text(page: Page, text: str):
await page.keyboard.type(text)
async def click_element(page: Page, backend_node_id: int):
cdp_session = await get_page_cdp_session(page)
resp = await get_bounding_rect(cdp_session, backend_node_id)
node_info = resp["result"]["value"]
x, y = await get_element_center(node_info)
await page.mouse.click(x, y)
async def hover_element(page: Page, backend_node_id: int) -> None:
cdp_session = await get_page_cdp_session(page)
resp = await get_bounding_rect(cdp_session, backend_node_id)
node_info = resp["result"]["value"]
x, y = await get_element_center(node_info)
await page.mouse.move(x, y)
async def scroll_page(page: Page, direction: str) -> None:
# perform the action
# code from natbot
if direction == "up":
await page.evaluate(
"(document.scrollingElement || document.body).scrollTop = (document.scrollingElement || document.body).scrollTop - window.innerHeight;"
)
elif direction == "down":
await page.evaluate(
"(document.scrollingElement || document.body).scrollTop = (document.scrollingElement || document.body).scrollTop + window.innerHeight;"
)
async def key_press(page: Page, key: str) -> None:
"""Press a key."""
if "Meta" in key and "Mac" not in await page.evaluate("navigator.platform"):
key = key.replace("Meta", "Control")
await page.keyboard.press(key)
async def get_element_outer_html(page: Page, backend_node_id: int):
cdp_session = await get_page_cdp_session(page)
try:
outer_html = await cdp_session.send("DOM.getOuterHTML", {"backendNodeId": int(backend_node_id)})
return outer_html["outerHTML"]
except Exception as e:
raise ValueError("Element not found") from e
async def get_element_center(node_info):
x, y, width, height = node_info["x"], node_info["y"], node_info["width"], node_info["height"]
center_x = x + width / 2
center_y = y + height / 2
return center_x, center_y
def extract_step(response: str, action_splitter: str = "```") -> str:
# find the first occurence of action
pattern = rf"{action_splitter}((.|\n)*?){action_splitter}"
match = re.search(pattern, response)
if match:
return match.group(1).strip()
else:
raise ValueError(f'Cannot find the answer phrase "{response}"')
async def get_bounding_rect(cdp_session, backend_node_id: str):
try:
remote_object = await cdp_session.send("DOM.resolveNode", {"backendNodeId": int(backend_node_id)})
remote_object_id = remote_object["object"]["objectId"]
response = await cdp_session.send(
"Runtime.callFunctionOn",
{
"objectId": remote_object_id,
"functionDeclaration": """
function() {
if (this.nodeType == 3) {
var range = document.createRange();
range.selectNode(this);
var rect = range.getBoundingClientRect().toJSON();
range.detach();
return rect;
} else {
return this.getBoundingClientRect().toJSON();
}
}
""",
"returnByValue": True,
},
)
return response
except Exception as e:
raise ValueError("Element not found") from e
IGNORED_ACTREE_PROPERTIES = (
"focusable",
"editable",
"readonly",
"level",
"settable",
"multiline",
"invalid",
)
def parse_accessibility_tree(accessibility_tree):
"""Parse the accessibility tree into a string text"""
node_id_to_idx = {}
for idx, node in enumerate(accessibility_tree):
node_id_to_idx[node["nodeId"]] = idx
obs_nodes_info = {}
def dfs(idx: int, obs_node_id: str, depth: int) -> str:
tree_str = ""
node = accessibility_tree[idx]
indent = "\t" * depth
valid_node = True
try:
role = node["role"]["value"]
name = node["name"]["value"]
node_str = f"[{obs_node_id}] {role} {repr(name)}"
properties = []
for property in node.get("properties", []):
try:
if property["name"] in IGNORED_ACTREE_PROPERTIES:
continue
properties.append(f'{property["name"]}: {property["value"]["value"]}')
except KeyError:
pass
if properties:
node_str += " " + " ".join(properties)
# check valid
if not node_str.strip():
valid_node = False
# empty generic node
if not name.strip():
if not properties:
if role in [
"generic",
"img",
"list",
"strong",
"paragraph",
"banner",
"navigation",
"Section",
"LabelText",
"Legend",
"listitem",
]:
valid_node = False
elif role in ["listitem"]:
valid_node = False
if valid_node:
tree_str += f"{indent}{node_str}"
obs_nodes_info[obs_node_id] = {
"backend_id": node["backendDOMNodeId"],
"union_bound": node["union_bound"],
"text": node_str,
}
except Exception:
valid_node = False
for _, child_node_id in enumerate(node["childIds"]):
if child_node_id not in node_id_to_idx:
continue
# mark this to save some tokens
child_depth = depth + 1 if valid_node else depth
child_str = dfs(node_id_to_idx[child_node_id], child_node_id, child_depth)
if child_str.strip():
if tree_str.strip():
tree_str += "\n"
tree_str += child_str
return tree_str
tree_str = dfs(0, accessibility_tree[0]["nodeId"], 0)
return tree_str, obs_nodes_info
async def get_page_cdp_session(page):
if hasattr(page, "cdp_session"):
return page.cdp_session
cdp_session = await page.context.new_cdp_session(page)
page.cdp_session = cdp_session
return cdp_session
def get_backend_node_id(element_id, accessibility_tree):
element_id = str(element_id)
for i in accessibility_tree:
if i["nodeId"] == element_id:
return i.get("backendDOMNodeId")
raise ValueError(f"Element {element_id} not found")

View file

@ -67,18 +67,18 @@ Create a 2048 game, follow the design doc and task doc. Write your code under /U
After writing all codes, write a code review for the codes, make improvement or adjustment based on the review.
Notice: You MUST implement the full code, don't leave comment without implementation!
Design doc:
{TASK_DOC_2048}
Task doc:
{DESIGN_DOC_2048}
Task doc:
{TASK_DOC_2048}
"""
GAME_REQ_SNAKE = f"""
Create a snake game, follow the design doc and task doc. Write your code under /Users/gary/Files/temp/workspace/snake_game/src.
After writing all codes, write a code review for the codes, make improvement or adjustment based on the review.
Notice: You MUST implement the full code, don't leave comment without implementation!
Design doc:
{TASK_DOC_SNAKE}
Task doc:
{DESIGN_DOC_SNAKE}
Task doc:
{TASK_DOC_SNAKE}
"""
GAME_REQ_2048_NO_DOC = """
Create a 2048 game with pygame. Write your code under /Users/gary/Files/temp/workspace/2048_game/src.