diff --git a/examples/di/run_flask.py b/examples/di/run_flask.py index ed0f35b8e..ae841a09c 100644 --- a/examples/di/run_flask.py +++ b/examples/di/run_flask.py @@ -2,7 +2,6 @@ import asyncio from metagpt.roles.di.data_interpreter import DataInterpreter - USE_GOT_REPO_REQ = """ Write a service using Flask, create a conda environment and run it, and call the service's interface for validation. Notice: Don't write all codes in one response, each time, just write code for one step. diff --git a/metagpt/actions/action_node.py b/metagpt/actions/action_node.py index 31e4cc0fc..48372f790 100644 --- a/metagpt/actions/action_node.py +++ b/metagpt/actions/action_node.py @@ -17,7 +17,7 @@ from pydantic import BaseModel, Field, create_model, model_validator from tenacity import retry, stop_after_attempt, wait_random_exponential from metagpt.actions.action_outcls_registry import register_action_outcls -from metagpt.const import USE_CONFIG_TIMEOUT +from metagpt.const import MARKDOWN_TITLE_PREFIX, USE_CONFIG_TIMEOUT from metagpt.llm import BaseLLM from metagpt.logs import logger from metagpt.provider.postprocess.llm_output_postprocess import llm_output_postprocess @@ -113,7 +113,7 @@ Follow format example's {prompt_schema} format, generate output and make sure it """ -def dict_to_markdown(d, prefix="- ", kv_sep="\n", postfix="\n"): +def dict_to_markdown(d, prefix=MARKDOWN_TITLE_PREFIX, kv_sep="\n", postfix="\n"): markdown_str = "" for key, value in d.items(): markdown_str += f"{prefix}{key}{kv_sep}{value}{postfix}" diff --git a/metagpt/actions/design_api.py b/metagpt/actions/design_api.py index e5f038c7c..b71c66543 100644 --- a/metagpt/actions/design_api.py +++ b/metagpt/actions/design_api.py @@ -26,6 +26,7 @@ from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO from metagpt.logs import logger from metagpt.schema import Document, Documents, Message from metagpt.utils.mermaid import mermaid_to_file +from metagpt.utils.report import DocsReporter NEW_REQ_TEMPLATE = """ ### Legacy Content @@ -70,31 +71,34 @@ class WriteDesign(Action): return ActionOutput(content=changed_files.model_dump_json(), instruct_content=changed_files) async def _new_system_design(self, context): - node = await DESIGN_API_NODE.fill(context=context, llm=self.llm) + node = await DESIGN_API_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema) return node async def _merge(self, prd_doc, system_design_doc): context = NEW_REQ_TEMPLATE.format(old_design=system_design_doc.content, context=prd_doc.content) - node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm) + node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema) system_design_doc.content = node.instruct_content.model_dump_json() return system_design_doc async def _update_system_design(self, filename) -> Document: prd = await self.repo.docs.prd.get(filename) old_system_design_doc = await self.repo.docs.system_design.get(filename) - if not old_system_design_doc: - system_design = await self._new_system_design(context=prd.content) - doc = await self.repo.docs.system_design.save( - filename=filename, - content=system_design.instruct_content.model_dump_json(), - dependencies={prd.root_relative_path}, - ) - else: - doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc) - await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path}) - await self._save_data_api_design(doc) - await self._save_seq_flow(doc) - await self.repo.resources.system_design.save_pdf(doc=doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "design"}, "meta") + if not old_system_design_doc: + system_design = await self._new_system_design(context=prd.content) + doc = await self.repo.docs.system_design.save( + filename=filename, + content=system_design.instruct_content.model_dump_json(), + dependencies={prd.root_relative_path}, + ) + else: + doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc) + await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path}) + await self._save_data_api_design(doc) + await self._save_seq_flow(doc) + md = await self.repo.resources.system_design.save_pdf(doc=doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return doc async def _save_data_api_design(self, design_doc): diff --git a/metagpt/actions/di/execute_nb_code.py b/metagpt/actions/di/execute_nb_code.py index e78c5acf3..1df17f2c6 100644 --- a/metagpt/actions/di/execute_nb_code.py +++ b/metagpt/actions/di/execute_nb_code.py @@ -13,9 +13,10 @@ from typing import Literal, Tuple import nbformat from nbclient import NotebookClient -from nbclient.exceptions import CellTimeoutError, DeadKernelError +from nbclient.exceptions import CellExecutionComplete, CellTimeoutError, DeadKernelError +from nbclient.util import ensure_async from nbformat import NotebookNode -from nbformat.v4 import new_code_cell, new_markdown_cell, new_output +from nbformat.v4 import new_code_cell, new_markdown_cell, new_output, output_from_msg from rich.box import MINIMAL from rich.console import Console, Group from rich.live import Live @@ -25,32 +26,64 @@ from rich.syntax import Syntax from metagpt.actions import Action from metagpt.const import DEFAULT_WORKSPACE_ROOT -from metagpt.logs import ToolLogItem, log_tool_output, logger +from metagpt.logs import logger +from metagpt.utils.report import NotebookReporter INSTALL_KEEPLEN = 500 +class RealtimeOutputNotebookClient(NotebookClient): + """Realtime output of Notebook execution.""" + + def __init__(self, *args, notebook_reporter=None, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.notebook_reporter = notebook_reporter or NotebookReporter() + + async def _async_poll_output_msg(self, parent_msg_id: str, cell: NotebookNode, cell_index: int) -> None: + """Implement a feature to enable sending messages.""" + assert self.kc is not None + while True: + msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None)) + await self._send_msg(msg) + + if msg["parent_header"].get("msg_id") == parent_msg_id: + try: + # Will raise CellExecutionComplete when completed + self.process_message(msg, cell, cell_index) + except CellExecutionComplete: + return + + async def _send_msg(self, msg: dict): + msg_type = msg.get("header", {}).get("msg_type") + if msg_type not in ["stream", "error", "execute_result"]: + return + + await self.notebook_reporter.async_report(output_from_msg(msg), "content") + + class ExecuteNbCode(Action): """execute notebook code block, return result to llm, and display it.""" nb: NotebookNode - nb_client: NotebookClient + nb_client: NotebookClient = None console: Console interaction: str timeout: int = 600 - def __init__( - self, - nb=nbformat.v4.new_notebook(), - timeout=600, - ): + def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600): super().__init__( nb=nb, - nb_client=NotebookClient(nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}), timeout=timeout, console=Console(), interaction=("ipython" if self.is_ipython() else "terminal"), ) + self.reporter = NotebookReporter() + self.nb_client = RealtimeOutputNotebookClient( + nb, + timeout=timeout, + resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}, + notebook_reporter=self.reporter, + ) async def build(self): if self.nb_client.kc is None or not await self.nb_client.kc.is_alive(): @@ -175,6 +208,8 @@ class ExecuteNbCode(Action): """set timeout for run code. returns the success or failure of the cell execution, and an optional error message. """ + await self.reporter.async_report(cell, "content") + try: await self.nb_client.async_execute_cell(cell, cell_index) return self.parse_outputs(self.nb.cells[-1].outputs) @@ -196,35 +231,36 @@ class ExecuteNbCode(Action): """ self._display(code, language) - if language == "python": - # add code to the notebook - self.add_code_cell(code=code) + async with self.reporter: + if language == "python": + # add code to the notebook + self.add_code_cell(code=code) - # build code executor - await self.build() + # build code executor + await self.build() - # run code - cell_index = len(self.nb.cells) - 1 - success, outputs = await self.run_cell(self.nb.cells[-1], cell_index) + # run code + cell_index = len(self.nb.cells) - 1 + success, outputs = await self.run_cell(self.nb.cells[-1], cell_index) - if "!pip" in code: - success = False - outputs = outputs[-INSTALL_KEEPLEN:] + if "!pip" in code: + success = False + outputs = outputs[-INSTALL_KEEPLEN:] + + elif language == "markdown": + # add markdown content to markdown cell in a notebook. + self.add_markdown_cell(code) + # return True, beacuse there is no execution failure for markdown cell. + outputs, success = code, True + else: + raise ValueError(f"Only support for language: python, markdown, but got {language}, ") file_path = DEFAULT_WORKSPACE_ROOT / "code.ipynb" nbformat.write(self.nb, file_path) - log_tool_output(ToolLogItem(name="file_path", value=file_path), tool_name="ExecuteNbCode") + await self.reporter.async_report(file_path, "path") return outputs, success - elif language == "markdown": - # add markdown content to markdown cell in a notebook. - self.add_markdown_cell(code) - # return True, beacuse there is no execution failure for markdown cell. - return code, True - else: - raise ValueError(f"Only support for language: python, markdown, but got {language}, ") - def remove_escape_and_color_codes(input_str: str): # 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码 diff --git a/metagpt/actions/project_management.py b/metagpt/actions/project_management.py index b52616e37..5aace9fa9 100644 --- a/metagpt/actions/project_management.py +++ b/metagpt/actions/project_management.py @@ -19,6 +19,7 @@ from metagpt.actions.project_management_an import PM_NODE, REFINED_PM_NODE from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME from metagpt.logs import logger from metagpt.schema import Document, Documents +from metagpt.utils.report import DocsReporter NEW_REQ_TEMPLATE = """ ### Legacy Content @@ -59,18 +60,21 @@ class WriteTasks(Action): async def _update_tasks(self, filename): system_design_doc = await self.repo.docs.system_design.get(filename) task_doc = await self.repo.docs.task.get(filename) - if task_doc: - task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc) - await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path}) - else: - rsp = await self._run_new_tasks(context=system_design_doc.content) - task_doc = await self.repo.docs.task.save( - filename=filename, - content=rsp.instruct_content.model_dump_json(), - dependencies={system_design_doc.root_relative_path}, - ) - await self._update_requirements(task_doc) - await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "task"}, "meta") + if task_doc: + task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc) + await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path}) + else: + rsp = await self._run_new_tasks(context=system_design_doc.content) + task_doc = await self.repo.docs.task.save( + filename=filename, + content=rsp.instruct_content.model_dump_json(), + dependencies={system_design_doc.root_relative_path}, + ) + await self._update_requirements(task_doc) + md = await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return task_doc async def _run_new_tasks(self, context): diff --git a/metagpt/actions/write_code.py b/metagpt/actions/write_code.py index aac3a6d87..6cfde2385 100644 --- a/metagpt/actions/write_code.py +++ b/metagpt/actions/write_code.py @@ -28,6 +28,7 @@ from metagpt.logs import logger from metagpt.schema import CodingContext, Document, RunCodeResult from metagpt.utils.common import CodeParser from metagpt.utils.project_repo import ProjectRepo +from metagpt.utils.report import EditorReporter PROMPT_TEMPLATE = """ NOTICE @@ -139,12 +140,15 @@ class WriteCode(Action): summary_log=summary_doc.content if summary_doc else "", ) logger.info(f"Writing {coding_context.filename}..") - code = await self.write_code(prompt) - if not coding_context.code_doc: - # avoid root_path pydantic ValidationError if use WriteCode alone - root_path = self.context.src_workspace if self.context.src_workspace else "" - coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path)) - coding_context.code_doc.content = code + async with EditorReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"filename": coding_context.filename}, "meta") + code = await self.write_code(prompt) + if not coding_context.code_doc: + # avoid root_path pydantic ValidationError if use WriteCode alone + root_path = self.context.src_workspace if self.context.src_workspace else "" + coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path)) + coding_context.code_doc.content = code + await reporter.async_report(self.repo.workdir / coding_context.code_doc.root_relative_path, "path") return coding_context @staticmethod diff --git a/metagpt/actions/write_prd.py b/metagpt/actions/write_prd.py index b66887164..3ba7e9543 100644 --- a/metagpt/actions/write_prd.py +++ b/metagpt/actions/write_prd.py @@ -37,6 +37,7 @@ from metagpt.schema import BugFixContext, Document, Documents, Message from metagpt.utils.common import CodeParser from metagpt.utils.file_repository import FileRepository from metagpt.utils.mermaid import mermaid_to_file +from metagpt.utils.report import DocsReporter CONTEXT_TEMPLATE = """ ### Project Name @@ -102,17 +103,22 @@ class WritePRD(Action): async def _handle_new_requirement(self, req: Document) -> ActionOutput: """handle new requirement""" - project_name = self.project_name - context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name) - exclude = [PROJECT_NAME.key] if project_name else [] - node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm, exclude=exclude) # schema=schema - await self._rename_workspace(node) - new_prd_doc = await self.repo.docs.prd.save( - filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json() - ) - await self._save_competitive_analysis(new_prd_doc) - await self.repo.resources.prd.save_pdf(doc=new_prd_doc) - return Documents.from_iterable(documents=[new_prd_doc]).to_action_output() + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "prd"}, "meta") + project_name = self.project_name + context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name) + exclude = [PROJECT_NAME.key] if project_name else [] + node = await WRITE_PRD_NODE.fill( + context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema + ) # schema=schema + await self._rename_workspace(node) + new_prd_doc = await self.repo.docs.prd.save( + filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json() + ) + await self._save_competitive_analysis(new_prd_doc) + md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") + return Documents.from_iterable(documents=[new_prd_doc]).to_action_output() async def _handle_requirement_update(self, req: Document, related_docs: list[Document]) -> ActionOutput: # ... requirement update logic ... @@ -146,10 +152,13 @@ class WritePRD(Action): return related_doc async def _update_prd(self, req: Document, prd_doc: Document) -> Document: - new_prd_doc: Document = await self._merge(req, prd_doc) - await self.repo.docs.prd.save_doc(doc=new_prd_doc) - await self._save_competitive_analysis(new_prd_doc) - await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "prd"}, "meta") + new_prd_doc: Document = await self._merge(req, prd_doc) + await self.repo.docs.prd.save_doc(doc=new_prd_doc) + await self._save_competitive_analysis(new_prd_doc) + md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return new_prd_doc async def _save_competitive_analysis(self, prd_doc: Document): diff --git a/metagpt/actions/write_prd_an.py b/metagpt/actions/write_prd_an.py index 6a995e184..a33685cd3 100644 --- a/metagpt/actions/write_prd_an.py +++ b/metagpt/actions/write_prd_an.py @@ -165,7 +165,7 @@ ANYTHING_UNCLEAR = ActionNode( key="Anything UNCLEAR", expected_type=str, instruction="Mention any aspects of the project that are unclear and try to clarify them.", - example="", + example="Currently, all aspects of the project are clear.", ) ISSUE_TYPE = ActionNode( diff --git a/metagpt/configs/role_custom_config.py b/metagpt/configs/role_custom_config.py index 414c2a793..581de605e 100644 --- a/metagpt/configs/role_custom_config.py +++ b/metagpt/configs/role_custom_config.py @@ -14,6 +14,6 @@ class RoleCustomConfig(YamlModel): role: role's className or role's role_id To be expanded """ + role: str = "" llm: LLMConfig - diff --git a/metagpt/const.py b/metagpt/const.py index 67636fad3..eaa22434f 100644 --- a/metagpt/const.py +++ b/metagpt/const.py @@ -140,5 +140,11 @@ LLM_API_TIMEOUT = 300 # Assistant alias ASSISTANT_ALIAS = "response" +# Markdown +MARKDOWN_TITLE_PREFIX = "## " + +# Reporter +METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "") + # Metadata defines AGENT = "agent" diff --git a/metagpt/logs.py b/metagpt/logs.py index bfb8c9265..63c10fa2f 100644 --- a/metagpt/logs.py +++ b/metagpt/logs.py @@ -8,8 +8,10 @@ from __future__ import annotations +import asyncio import inspect import sys +from contextvars import ContextVar from datetime import datetime from functools import partial from typing import Any @@ -19,6 +21,8 @@ from pydantic import BaseModel, Field from metagpt.const import METAGPT_ROOT +LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream") + class ToolLogItem(BaseModel): type_: str = Field(alias="type", default="str", description="Data type of `value` field.") @@ -47,6 +51,20 @@ logger = define_log_level() def log_llm_stream(msg): + """ + Logs a message to the LLM stream. + + Args: + msg: The message to be logged. + + Notes: + If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called), + the message will not be added to the LLM stream queue. + """ + + queue = get_llm_stream_queue() + if queue: + queue.put_nowait(msg) _llm_stream_log(msg) @@ -102,4 +120,24 @@ async def _tool_output_log_async(*args, **kwargs): pass +def create_llm_stream_queue(): + """Creates a new LLM stream queue and sets it in the context variable. + + Returns: + The newly created asyncio.Queue instance. + """ + queue = asyncio.Queue() + LLM_STREAM_QUEUE.set(queue) + return queue + + +def get_llm_stream_queue(): + """Retrieves the current LLM stream queue from the context variable. + + Returns: + The asyncio.Queue instance if set, otherwise None. + """ + return LLM_STREAM_QUEUE.get(None) + + _get_human_input = input # get human input from console by default diff --git a/metagpt/schema.py b/metagpt/schema.py index 9fa295211..cfe991cd9 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -47,10 +47,11 @@ from metagpt.const import ( SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO, ) -from metagpt.logs import ToolLogItem, log_tool_output, logger +from metagpt.logs import logger from metagpt.repo_parser import DotClassInfo from metagpt.utils.common import CodeParser, any_to_str, any_to_str_set, import_class from metagpt.utils.exceptions import handle_exception +from metagpt.utils.report import TaskReporter from metagpt.utils.serialize import ( actionoutout_schema_to_mapping, actionoutput_mapping_to_str, @@ -578,14 +579,7 @@ class Plan(BaseModel): current_task_id = task.task_id break self.current_task_id = current_task_id - - log_tool_output( - [ - ToolLogItem(type="object", name="tasks", value=self.tasks), - ToolLogItem(type="object", name="current_task_id", value=self.current_task_id), - ], - tool_name="Plan", - ) + TaskReporter().report({"tasks": self.tasks, "current_task_id": current_task_id}) @property def current_task(self) -> Task: diff --git a/metagpt/software_company.py b/metagpt/software_company.py index 431a3a179..7f0c56388 100644 --- a/metagpt/software_company.py +++ b/metagpt/software_company.py @@ -8,7 +8,6 @@ import typer from metagpt.const import CONFIG_ROOT from metagpt.utils.common import any_to_str -from metagpt.utils.project_repo import ProjectRepo app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False) @@ -26,7 +25,7 @@ def generate_repo( reqa_file="", max_auto_summarize_code=0, recover_path=None, -) -> ProjectRepo: +): """Run the startup logic. Can be called from CLI or other Python scripts.""" from metagpt.config2 import config from metagpt.context import Context diff --git a/metagpt/tools/libs/browser.py b/metagpt/tools/libs/browser.py index b6a5b7cbf..6a7fd8e3b 100644 --- a/metagpt/tools/libs/browser.py +++ b/metagpt/tools/libs/browser.py @@ -1,8 +1,8 @@ from playwright.async_api import async_playwright from metagpt.const import DEFAULT_WORKSPACE_ROOT -from metagpt.logs import ToolLogItem, log_tool_output_async from metagpt.tools.tool_registry import register_tool +from metagpt.utils.report import BrowserReporter @register_tool() @@ -20,6 +20,7 @@ class Browser: self.pages = {} self.current_page_url = None self.current_page = None + self.reporter = BrowserReporter() async def start(self): """Starts Playwright and launches a browser""" @@ -34,21 +35,19 @@ class Browser: async def open_new_page(self, url: str): """open a new page in the browser and view the page""" - page = await self.browser.new_page() - await page.goto(url) - self.pages[url] = page - await self._set_current_page(page, url) - await log_tool_output_async( - ToolLogItem(type="object", name="open_new_page", value=self.current_page), tool_name="Browser" - ) + async with self.reporter as reporter: + page = await self.browser.new_page() + await reporter.async_report(url, "url") + await page.goto(url) + self.pages[url] = page + await self._set_current_page(page, url) + await reporter.async_report(page, "page") async def switch_page(self, url: str): """switch to an opened page in the browser and view the page""" if url in self.pages: await self._set_current_page(self.pages[url], url) - await log_tool_output_async( - ToolLogItem(type="object", name="switch_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") else: print(f"Page not found: {url}") @@ -110,9 +109,8 @@ class Browser: index = len(search_results) - 1 element = search_results[index]["element_obj"] await element.scroll_into_view_if_needed() - await log_tool_output_async( - ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") + print(f"Successfully scrolled to the {index}-th search result") print(await self._view()) @@ -152,9 +150,8 @@ class Browser: async def scroll_current_page(self, offset: int = 500): """scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling""" await self.current_page.evaluate(f"window.scrollBy(0, {offset})") - await log_tool_output_async( - ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") + print(f"Scrolled current page by {offset} pixels.") print(await self._view()) diff --git a/metagpt/tools/libs/deployer.py b/metagpt/tools/libs/deployer.py index 3c9c2f7e5..c30ad0176 100644 --- a/metagpt/tools/libs/deployer.py +++ b/metagpt/tools/libs/deployer.py @@ -1,5 +1,5 @@ -from metagpt.logs import ToolLogItem, log_tool_output from metagpt.tools.tool_registry import register_tool +from metagpt.utils.report import ServerReporter # An un-implemented tool reserved for deploying a local service to public @@ -8,4 +8,4 @@ class Deployer: """Deploy a local service to public. Used only for final deployment, you should NOT use it for development and testing.""" def deploy_to_public(self, local_url: str): - log_tool_output(ToolLogItem(name="local_url", value=local_url), tool_name="Deployer") + ServerReporter().report(local_url, "local_url") diff --git a/metagpt/tools/libs/file_manager.py b/metagpt/tools/libs/file_manager.py index 75c173d5e..588059dc5 100644 --- a/metagpt/tools/libs/file_manager.py +++ b/metagpt/tools/libs/file_manager.py @@ -4,8 +4,8 @@ import subprocess from pydantic import BaseModel, Field -from metagpt.logs import ToolLogItem, log_tool_output from metagpt.tools.tool_registry import register_tool +from metagpt.utils.report import EditorReporter class FileBlock(BaseModel): @@ -23,17 +23,20 @@ class FileBlock(BaseModel): class FileManager: """A tool for reading, understanding, writing, and editing files""" + def __init__(self) -> None: + self.resource = EditorReporter() + def write(self, path: str, content: str): """Write the whole content to a file.""" with open(path, "w") as f: f.write(content) - log_tool_output(ToolLogItem(name="write_file_path", value=path), tool_name="FileManager") + self.resource.report(path, "path") def read(self, path: str) -> str: """Read the whole content of a file.""" with open(path, "r") as f: + self.resource.report(path, "path") return f.read() - log_tool_output(ToolLogItem(name="read_file_path", value=path), tool_name="FileManager") def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock: """ @@ -78,10 +81,7 @@ class FileManager: symbol=symbol, symbol_line=i + 1, ) - log_tool_output( - ToolLogItem(type="object", name="file_block_searched", value=result), - tool_name="FileManager", - ) + self.resource.report(result.file_path, "path") return result return None @@ -124,9 +124,7 @@ class FileManager: block_start_line=start_line, block_end_line=-1 if end_line < start_line else start_line + new_block_content.count("\n"), ) - log_tool_output( - ToolLogItem(type="object", name="file_block_written", value=new_file_block), tool_name="FileManager" - ) + self.resource.report(new_file_block.file_path, "path") return f"Content written successfully to {file_path}" diff --git a/metagpt/tools/libs/terminal.py b/metagpt/tools/libs/terminal.py index dc395e89b..5f5989e1a 100644 --- a/metagpt/tools/libs/terminal.py +++ b/metagpt/tools/libs/terminal.py @@ -2,8 +2,8 @@ import subprocess import threading from queue import Queue -from metagpt.logs import TOOL_LOG_END_MARKER, ToolLogItem, log_tool_output from metagpt.tools.tool_registry import register_tool +from metagpt.utils.report import END_MARKER_VALUE, TerminalReporter @register_tool() @@ -28,9 +28,10 @@ class Terminal: stderr=subprocess.STDOUT, text=True, bufsize=1, # Line buffered - executable="/bin/bash" + executable="/bin/bash", ) self.stdout_queue = Queue() + self.observer = TerminalReporter() def run_command(self, cmd: str, daemon=False) -> str: """ @@ -59,7 +60,7 @@ class Terminal: # Send the command self.process.stdin.write(cmd + self.command_terminator) self.process.stdin.write( - f'echo "{TOOL_LOG_END_MARKER.value}"' + self.command_terminator # write EOF + f'echo "{END_MARKER_VALUE}"' + self.command_terminator # write EOF ) # Unique marker to signal command end self.process.stdin.flush() if daemon: @@ -93,28 +94,26 @@ class Terminal: return self.run_command(cmd, daemon=daemon) def _read_and_process_output(self, cmd): - cmd_output = [] - log_tool_output( - output=ToolLogItem(name="cmd", value=cmd + self.command_terminator), tool_name="Terminal" - ) # log the command + with self.observer as observer: + cmd_output = [] + observer.report(cmd + self.command_terminator, "cmd") + # report the command - # Read the output until the unique marker is found - while True: - line = self.process.stdout.readline() - ix = line.rfind(TOOL_LOG_END_MARKER.value) - if ix >= 0: - line = line[0:ix] - if line: - log_tool_output( - output=ToolLogItem(name="output", value=line), tool_name="Terminal" - ) # log stdout in real-time - cmd_output.append(line) - log_tool_output(TOOL_LOG_END_MARKER) - break - # log stdout in real-time - log_tool_output(output=ToolLogItem(name="output", value=line), tool_name="Terminal") - cmd_output.append(line) - self.stdout_queue.put(line) + # Read the output until the unique marker is found + while True: + line = self.process.stdout.readline() + ix = line.rfind(END_MARKER_VALUE) + if ix >= 0: + line = line[0:ix] + if line: + observer.report(line, "output") + # report stdout in real-time + cmd_output.append(line) + break + # log stdout in real-time + observer.report(line, "output") + cmd_output.append(line) + self.stdout_queue.put(line) return "".join(cmd_output) diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index 384d4e8ac..e2520ef13 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -37,7 +37,7 @@ from PIL import Image from pydantic_core import to_jsonable_python from tenacity import RetryCallState, RetryError, _utils -from metagpt.const import MESSAGE_ROUTE_TO_ALL +from metagpt.const import MARKDOWN_TITLE_PREFIX, MESSAGE_ROUTE_TO_ALL from metagpt.logs import logger from metagpt.utils.exceptions import handle_exception @@ -65,7 +65,7 @@ class OutputParser: @classmethod def parse_blocks(cls, text: str): # 首先根据"##"将文本分割成不同的block - blocks = text.split("##") + blocks = text.split(MARKDOWN_TITLE_PREFIX) # 创建一个字典,用于存储每个block的标题和内容 block_dict = {} diff --git a/metagpt/utils/file_repository.py b/metagpt/utils/file_repository.py index d19f2b705..dd6c0709f 100644 --- a/metagpt/utils/file_repository.py +++ b/metagpt/utils/file_repository.py @@ -198,8 +198,9 @@ class FileRepository: :type dependencies: List[str], optional """ - await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies) + doc = await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies) logger.debug(f"File Saved: {str(doc.filename)}") + return doc async def save_pdf(self, doc: Document, with_suffix: str = ".md", dependencies: List[str] = None): """Save a Document instance as a PDF file. @@ -216,8 +217,9 @@ class FileRepository: """ m = json.loads(doc.content) filename = Path(doc.filename).with_suffix(with_suffix) if with_suffix is not None else Path(doc.filename) - await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies) + doc = await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies) logger.debug(f"File Saved: {str(filename)}") + return doc async def delete(self, filename: Path | str): """Delete a file from the file repository. diff --git a/metagpt/utils/report.py b/metagpt/utils/report.py new file mode 100644 index 000000000..20f9ccd01 --- /dev/null +++ b/metagpt/utils/report.py @@ -0,0 +1,305 @@ +import asyncio +import os +import typing +from enum import Enum +from pathlib import Path +from typing import Any, Callable, Literal, Optional, Union +from urllib.parse import unquote, urlparse, urlunparse +from uuid import UUID, uuid4 + +from aiohttp import ClientSession, UnixConnector +from playwright.async_api import Page as AsyncPage +from playwright.sync_api import Page as SyncPage +from pydantic import BaseModel, Field, PrivateAttr + +from metagpt.const import METAGPT_REPORTER_DEFAULT_URL +from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue + +if typing.TYPE_CHECKING: + from metagpt.roles.role import Role + +try: + import requests_unixsocket as requests +except ImportError: + import requests + +from contextvars import ContextVar + +CURRENT_ROLE: ContextVar["Role"] = ContextVar("role") + + +class BlockType(str, Enum): + """Enumeration for different types of blocks.""" + + TERMINAL = "Terminal" + TASK = "Task" + BROWSER = "Browser" + BROWSER_RT = "Browser-RT" + EDITOR = "Editor" + GALLERY = "Gallery" + NOTEBOOK = "Notebook" + DOCS = "Docs" + + +END_MARKER_NAME = "end_marker" +END_MARKER_VALUE = "\x18\x19\x1B\x18" + + +class ResourceReporter(BaseModel): + """Base class for resource reporting.""" + + block: BlockType = Field(description="The type of block that is reporting the resource") + uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") + is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") + enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") + callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") + _llm_task: Optional[asyncio.Task] = PrivateAttr(None) + + def report(self, value: Any, name: str): + """Synchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return self._report(value, name) + + async def async_report(self, value: Any, name: str): + """Asynchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return await self._async_report(value, name) + + @classmethod + def set_report_fn(cls, fn: Callable): + """Set the synchronous report function. + + Args: + fn: A callable function used for synchronous reporting. For example: + + >>> def _report(self, value: Any, name: str): + ... print(value, name) + + """ + cls._report = fn + + @classmethod + def set_async_report_fn(cls, fn: Callable): + """Set the asynchronous report function. + + Args: + fn: A callable function used for asynchronous reporting. For example: + + ```python + >>> async def _report(self, value: Any, name: str): + ... print(value, name) + ``` + """ + cls._async_report = fn + + def _report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + resp = requests.post(self.callback_url, json=data) + resp.raise_for_status() + return resp.text + + async def _async_report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + url = self.callback_url + _result = urlparse(url) + sessiion_kwargs = {} + if _result.scheme.endswith("+unix"): + parsed_list = list(_result) + parsed_list[0] = parsed_list[0][:-5] + parsed_list[1] = "fake.org" + url = urlunparse(parsed_list) + sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc)) + + async with ClientSession(**sessiion_kwargs) as client: + async with client.post(url, json=data) as resp: + resp.raise_for_status() + return await resp.text() + + def _format_data(self, value, name): + data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream")) + data["value"] = str(value) if isinstance(value, Path) else value + data["name"] = name + role = CURRENT_ROLE.get(None) + if role: + role_name = role.name + else: + role_name = os.environ.get("METAGPT_ROLE") + data["role"] = role_name + return data + + def __enter__(self): + """Enter the synchronous streaming callback context.""" + self.is_chunk = True + return self + + def __exit__(self, *args, **kwargs): + """Exit the synchronous streaming callback context.""" + self.report(None, END_MARKER_NAME) + self.is_chunk = False + + async def __aenter__(self): + """Enter the asynchronous streaming callback context.""" + self.is_chunk = True + if self.enable_llm_stream: + queue = create_llm_stream_queue() + self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) + return self + + async def __aexit__(self, *args, **kwargs): + """Exit the asynchronous streaming callback context.""" + self.is_chunk = False + if self.enable_llm_stream: + self._llm_task.cancel() + self._llm_task = None + await self.async_report(None, END_MARKER_NAME) + + async def _llm_stream_report(self, queue: asyncio.Queue): + while self.is_chunk: + await self.async_report(await queue.get(), "content") + + async def wait_llm_stream_report(self): + """Wait for the LLM stream report to complete.""" + queue = get_llm_stream_queue() + while self._llm_task: + if queue.empty(): + break + await asyncio.sleep(0.01) + + +class TerminalReporter(ResourceReporter): + """Terminal output callback for streaming reporting of command and output. + + The terminal has state, and an agent can open multiple terminals and input different commands into them. + To correctly display these states, each terminal should have its own unique ID, so in practice, each terminal + should instantiate its own TerminalReporter object. + """ + + block: Literal[BlockType.TERMINAL] = BlockType.TERMINAL + + def report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output asynchronously.""" + return await super().async_report(value, name) + + +class BrowserReporter(ResourceReporter): + """Browser output callback for streaming reporting of requested URL and page content. + + The browser has state, so in practice, each browser should instantiate its own BrowserReporter object. + """ + + block: Literal[BlockType.BROWSER] = BlockType.BROWSER + + def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]): + """Report browser URL or page content synchronously.""" + if name == "page": + value = {"page_url": value.url, "title": value.title(), "screenshot": str(value.screenshot())} + return super().report(value, name) + + async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]): + """Report browser URL or page content asynchronously.""" + if name == "page": + value = {"page_url": value.url, "title": await value.title(), "screenshot": str(await value.screenshot())} + return await super().async_report(value, name) + + +class ServerReporter(ResourceReporter): + """Callback for server deployment reporting.""" + + block: Literal[BlockType.BROWSER_RT] = BlockType.BROWSER_RT + + def report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment asynchronously.""" + return await super().async_report(value, name) + + +class ObjectReporter(ResourceReporter): + """Callback for reporting complete object resources.""" + + def report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource asynchronously.""" + return await super().async_report(value, name) + + +class TaskReporter(ObjectReporter): + """Reporter for object resources to Task Block.""" + + block: Literal[BlockType.TASK] = BlockType.TASK + + +class FileReporter(ResourceReporter): + """File resource callback for reporting complete file paths. + + There are two scenarios: if the file needs to be output in its entirety at once, use non-streaming callback; + if the file can be partially output for display first, use streaming callback. + """ + + def report(self, value: Union[Path, dict, Any], name: Literal["path", "meta", "content"] = "path"): + """Report file resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Path, name: Literal["path", "meta", "content"] = "path"): + """Report file resource asynchronously.""" + return await super().async_report(value, name) + + +class NotebookReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.NOTEBOOK).""" + + block: Literal[BlockType.NOTEBOOK] = BlockType.NOTEBOOK + + +class DocsReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.DOCS).""" + + block: Literal[BlockType.DOCS] = BlockType.DOCS + + +class EditorReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.Editor).""" + + block: Literal[BlockType.EDITOR] = BlockType.EDITOR + + +class GalleryReporter(FileReporter): + """Image resource callback for reporting complete file paths. + + Since images need to be complete before display, each callback is a complete file path. However, the Gallery + needs to display the type of image and prompt, so if there is meta information, it should be reported in a + streaming manner. + """ + + block: Literal[BlockType.GALLERY] = BlockType.GALLERY + + def report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource asynchronously.""" + return await super().async_report(value, name) diff --git a/tests/conftest.py b/tests/conftest.py index 8603c752a..f26ab2ef9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -247,14 +247,16 @@ def search_engine_mocker(aiohttp_mocker, curl_cffi_mocker, httplib2_mocker, sear @pytest.fixture def http_server(): - async def handler(request): - return aiohttp.web.Response( - text="""
-