Merge branch 'mgx_ops' into feature-terminal

This commit is contained in:
seeker 2024-04-12 11:55:40 +08:00
commit 73afb493de
21 changed files with 517 additions and 67 deletions

View file

@ -13,6 +13,7 @@ from metagpt.tools.libs import (
email_login,
terminal,
file_manager,
browser,
)
from metagpt.tools.libs.software_development import (
write_prd,
@ -40,4 +41,5 @@ _ = (
git_archive,
terminal,
file_manager,
browser,
) # Avoid pre-commit error

View file

@ -0,0 +1,197 @@
from playwright.async_api import async_playwright
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import ToolLogItem, log_tool_output_async
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import encode_image
@register_tool()
class Browser:
"""
A tool for browsing the web. Don't initialize a new instance of this class if one already exists.
Note: Combine searching, scrolling, extraction, and link finding together to achieve most effective browsing. DON'T stick to one method.
"""
def __init__(self):
"""initiate the browser, create pages placeholder later to be managed as {page_url: page object}"""
self.browser = None
from metagpt.config2 import config
from metagpt.llm import LLM
self.llm = LLM(llm_config=config.get_openai_llm())
self.llm.model = "gpt-4-vision-preview"
# browser status management
self.pages = {}
self.current_page_url = None
self.current_page = None
async def start(self):
"""Starts Playwright and launches a browser"""
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch()
def _set_current_page(self, page, url):
self.current_page = page
self.current_page_url = url
print("Now on page ", url)
async def open_new_page(self, url: str):
"""open a new page in the browser, set it as the current page"""
page = await self.browser.new_page()
await page.goto(url)
self.pages[url] = page
self._set_current_page(page, url)
await log_tool_output_async(
ToolLogItem(type="object", name="open_new_page", value=self.current_page), tool_name="Browser"
)
async def switch_page(self, url: str):
"""switch to an opened page in the browser, set it as the current page"""
if url in self.pages:
self._set_current_page(self.pages[url], url)
await log_tool_output_async(
ToolLogItem(type="object", name="switch_page", value=self.current_page), tool_name="Browser"
)
else:
print(f"Page not found: {url}")
async def search_content_all(self, search_term: str) -> list[dict]:
"""search all occurences of search term in the current page and return the search results with their position.
Useful if you have a keyword or sentence in mind and want to quickly narrow down the content relevant to it.
Args:
search_term (str): the search term
Returns:
list[dict]: a list of dictionaries containing the elements and their positions, e.g.
[
{
"index": ...,
"content": {
"text_block": ...,
"links": [
{"text": ..., "href": ...},
...
]
},
"position": {from_top: ..., from_left: ...},
},
...
]
"""
locator = self.current_page.locator(f"text={search_term}")
count = await locator.count()
search_results = []
for i in range(count):
element = locator.nth(i)
if await element.is_visible():
position = await element.evaluate("e => ({ from_top: e.offsetTop, from_left: e.offsetLeft })")
# Retrieve the surrounding block of text and links with their text
content = await element.evaluate(
"""
(element) => {
// const block = element.closest('p, div, section, article');
const block = element.parentElement;
return {
text_block: block.innerText,
// Create an array of objects, each containing the text and href of a link
links: Array.from(block.querySelectorAll('a')).map(a => ({
text: a.innerText,
href: a.href
}))
};
}
"""
)
search_results.append(
{"index": len(search_results), "content": content, "position": position, "element_obj": element}
)
print(f"Found {len(search_results)} instances of the term '{search_term}':\n\n{search_results}")
return search_results
async def scroll_to_search_result(self, search_results: list[dict], index: int = 0):
"""Scroll to the index-th search result, potentially for subsequent perception.
Useful if you have located a search result, the search result does not fulfill your requirement, and you need more information around that search result. Can only be used after search_all_content.
Args:
search_results (list[dict]): search_results from search_content_all
index (int, optional): the index of the search result to scroll to. Index starts from 0. Defaults to 0.
"""
if not search_results:
return {}
if index >= len(search_results):
print(f"Index {index} is out of range. Scrolling to the last instance.")
index = len(search_results) - 1
element = search_results[index]["element_obj"]
await element.scroll_into_view_if_needed()
print(f"Successfully scrolled to the {index}-th search result, consider extract more info around it.")
await log_tool_output_async(
ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser"
)
async def find_links(self) -> list:
"""Finds all links in the current page and returns a list of dictionaries with link text and the URL.
Useful for navigating to more pages and exploring more resources.
Returns:
list: A list of dictionaries, each containing 'text' and 'href' keys.
"""
# Use a CSS selector to find all <a> elements in the page.
links = await self.current_page.query_selector_all("a")
# Prepare an empty list to hold link information.
link_info = []
# Iterate over each link element to extract its text and href attributes.
for link in links:
text = await link.text_content()
href = await link.get_attribute("href")
link_info.append({"text": text, "href": href})
print(f"Found {len(link_info)} links:\n\n{link_info}")
return link_info
async def extract_info_from_view(self, instruction: str) -> str:
"""
Extract useful info from the current page view.
Args:
instruction (str): explain what info needs to be extracted
Returns:
str: extracted info from current view
"""
img_path = DEFAULT_WORKSPACE_ROOT / "screenshot_temp.png"
await self.current_page.screenshot(path=img_path)
rsp = await self.llm.aask(msg=instruction, images=[encode_image(img_path)])
return rsp
async def scroll_current_page(self, offset: int = 500):
"""scroll the current page by offset pixels, negative value means scrolling up, returning the content observed after scrolling"""
await self.current_page.evaluate(f"window.scrollBy(0, {offset})")
print(f"Scrolled current page by {offset} pixels. Perceive the scrolled view if needed")
await log_tool_output_async(
ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser"
)
def check_all_pages(self) -> dict:
"""return all pages opened in the browser, a dictionary with {page_url: page_title}, useful for understanding the current browser state"""
pages_info = {url: page.title() for url, page in self.pages.items()}
return pages_info
async def close(self):
"""close the browser and all pages"""
await self.browser.close()
await self.playwright.stop()
async def get_scroll_position(page):
return await page.evaluate("() => ({ x: window.scrollX, y: window.scrollY })")

View file

@ -5,11 +5,12 @@ from __future__ import annotations
from pathlib import Path
from typing import Optional
from metagpt.const import BUGFIX_FILENAME, REQUIREMENT_FILENAME
from metagpt.const import ASSISTANT_ALIAS, BUGFIX_FILENAME, REQUIREMENT_FILENAME
from metagpt.logs import ToolLogItem, log_tool_output
from metagpt.schema import BugFixContext, Message
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import any_to_str
from metagpt.utils.project_repo import ProjectRepo
@register_tool(tags=["software development", "ProductManager"])
@ -42,22 +43,33 @@ async def write_prd(idea: str, project_path: Optional[str | Path] = None) -> Pat
from metagpt.context import Context
from metagpt.roles import ProductManager
log_tool_output(output=[ToolLogItem(name=ASSISTANT_ALIAS, value=write_prd.__name__)], tool_name=write_prd.__name__)
ctx = Context()
if project_path and Path(project_path).exists():
ctx.config.project_path = Path(project_path)
ctx.config.inc = True
role = ProductManager(context=ctx)
msg = await role.run(with_message=Message(content=idea, cause_by=UserRequirement))
await role.run(with_message=msg)
outputs = [
ToolLogItem(name="PRD File", value=str(ctx.repo.docs.prd.workdir / i))
ToolLogItem(name="Intermedia PRD File", value=str(ctx.repo.docs.prd.workdir / i))
for i in ctx.repo.docs.prd.changed_files.keys()
]
for i in ctx.repo.resources.competitive_analysis.changed_files.keys():
outputs.append(
outputs.extend(
[
ToolLogItem(name="PRD File", value=str(ctx.repo.resources.prd.workdir / i))
for i in ctx.repo.resources.prd.changed_files.keys()
]
)
outputs.extend(
[
ToolLogItem(name="Competitive Analysis", value=str(ctx.repo.resources.competitive_analysis.workdir / i))
)
for i in ctx.repo.resources.competitive_analysis.changed_files.keys()
]
)
log_tool_output(output=outputs, tool_name=write_prd.__name__)
return ctx.repo.docs.prd.workdir
@ -85,6 +97,10 @@ async def write_design(prd_path: str | Path) -> Path:
from metagpt.context import Context
from metagpt.roles import Architect
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=write_design.__name__)], tool_name=write_design.__name__
)
ctx = Context()
prd_path = Path(prd_path)
project_path = (Path(prd_path) if not prd_path.is_file() else prd_path.parent) / "../.."
@ -132,6 +148,11 @@ async def write_project_plan(system_design_path: str | Path) -> Path:
from metagpt.context import Context
from metagpt.roles import ProjectManager
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=write_project_plan.__name__)],
tool_name=write_project_plan.__name__,
)
ctx = Context()
system_design_path = Path(system_design_path)
project_path = (system_design_path if not system_design_path.is_file() else system_design_path.parent) / "../.."
@ -141,9 +162,15 @@ async def write_project_plan(system_design_path: str | Path) -> Path:
await role.run(with_message=Message(content="", cause_by=WriteDesign))
outputs = [
ToolLogItem(name="Project Plan", value=str(ctx.repo.docs.task.workdir / i))
ToolLogItem(name="Intermedia Project Plan", value=str(ctx.repo.docs.task.workdir / i))
for i in ctx.repo.docs.task.changed_files.keys()
]
outputs.extend(
[
ToolLogItem(name="Project Plan", value=str(ctx.repo.resources.api_spec_and_task.workdir / i))
for i in ctx.repo.resources.api_spec_and_task.changed_files.keys()
]
)
log_tool_output(output=outputs, tool_name=write_project_plan.__name__)
return ctx.repo.docs.task.workdir
@ -179,6 +206,10 @@ async def write_codes(task_path: str | Path, inc: bool = False) -> Path:
from metagpt.context import Context
from metagpt.roles import Engineer
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=write_codes.__name__)], tool_name=write_codes.__name__
)
ctx = Context()
ctx.config.inc = inc
task_path = Path(task_path)
@ -222,6 +253,10 @@ async def run_qa_test(src_path: str | Path) -> Path:
from metagpt.environment import Environment
from metagpt.roles import QaEngineer
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=run_qa_test.__name__)], tool_name=run_qa_test.__name__
)
ctx = Context()
src_path = Path(src_path)
project_path = (src_path if not src_path.is_file() else src_path.parent) / ".."
@ -270,6 +305,8 @@ async def fix_bug(project_path: str | Path, issue: str) -> Path:
from metagpt.context import Context
from metagpt.roles import Engineer
log_tool_output(output=[ToolLogItem(name=ASSISTANT_ALIAS, value=fix_bug.__name__)], tool_name=fix_bug.__name__)
ctx = Context()
ctx.set_repo_dir(project_path)
ctx.src_workspace = ctx.git_repo.workdir / ctx.git_repo.workdir.name
@ -325,11 +362,21 @@ async def git_archive(project_path: str | Path) -> str:
"""
from metagpt.context import Context
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=git_archive.__name__)], tool_name=git_archive.__name__
)
ctx = Context()
ctx.set_repo_dir(project_path)
project_dir = ProjectRepo.search_project_path(project_path)
if not project_dir:
ValueError(f"{project_path} is not a valid git repository.")
ctx.set_repo_dir(project_dir)
files = " ".join(ctx.git_repo.changed_files.keys())
outputs = [ToolLogItem(name="cmd", value=f"git add {files}")]
log_tool_output(output=outputs, tool_name=git_archive.__name__)
ctx.git_repo.archive()
outputs = [ToolLogItem(name="Git Commit", value=str(ctx.repo.workdir))]
outputs = [ToolLogItem(name="cmd", value="git commit -m 'Archive'")]
log_tool_output(output=outputs, tool_name=git_archive.__name__)
return ctx.git_repo.log()
@ -358,6 +405,10 @@ async def import_git_repo(url: str) -> Path:
from metagpt.actions.import_repo import ImportRepo
from metagpt.context import Context
log_tool_output(
output=[ToolLogItem(name=ASSISTANT_ALIAS, value=import_git_repo.__name__)], tool_name=import_git_repo.__name__
)
ctx = Context()
action = ImportRepo(repo_path=url, context=ctx)
await action.run()

View file

@ -20,8 +20,7 @@ def convert_code_to_tool_schema(obj, include: list[str] = None) -> dict:
continue
# method_doc = inspect.getdoc(method)
method_doc = get_class_method_docstring(obj, name)
if method_doc:
schema["methods"][name] = function_docstring_to_schema(method, method_doc)
schema["methods"][name] = function_docstring_to_schema(method, method_doc)
elif inspect.isfunction(obj):
schema = function_docstring_to_schema(obj, docstring)
@ -39,7 +38,7 @@ def convert_code_to_tool_schema_ast(code: str) -> list[dict]:
return visitor.get_tool_schemas()
def function_docstring_to_schema(fn_obj, docstring) -> dict:
def function_docstring_to_schema(fn_obj, docstring="") -> dict:
"""
Converts a function's docstring into a schema dictionary.