diff --git a/metagpt/const.py b/metagpt/const.py index 0b04e9320..979f00cf4 100644 --- a/metagpt/const.py +++ b/metagpt/const.py @@ -142,3 +142,6 @@ ASSISTANT_ALIAS = "response" # Markdown MARKDOWN_TITLE_PREFIX = "## " + +# Reporter +METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "") diff --git a/metagpt/logs.py b/metagpt/logs.py index 9a82d7b1b..663b75318 100644 --- a/metagpt/logs.py +++ b/metagpt/logs.py @@ -8,7 +8,9 @@ from __future__ import annotations +import asyncio import sys +from contextvars import ContextVar from datetime import datetime from functools import partial from typing import Any @@ -18,6 +20,8 @@ from pydantic import BaseModel, Field from metagpt.const import METAGPT_ROOT +LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream") + class ToolLogItem(BaseModel): type_: str = Field(alias="type", default="str", description="Data type of `value` field.") @@ -46,6 +50,20 @@ logger = define_log_level() def log_llm_stream(msg): + """ + Logs a message to the LLM stream. + + Args: + msg: The message to be logged. + + Notes: + If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called), + the message will not be added to the LLM stream queue. + """ + + queue = get_llm_stream_queue() + if queue: + queue.put_nowait(msg) _llm_stream_log(msg) @@ -86,3 +104,23 @@ _tool_output_log = ( async def _tool_output_log_async(*args, **kwargs): # async version pass + + +def create_llm_stream_queue(): + """Creates a new LLM stream queue and sets it in the context variable. + + Returns: + The newly created asyncio.Queue instance. + """ + queue = asyncio.Queue() + LLM_STREAM_QUEUE.set(queue) + return queue + + +def get_llm_stream_queue(): + """Retrieves the current LLM stream queue from the context variable. + + Returns: + The asyncio.Queue instance if set, otherwise None. + """ + return LLM_STREAM_QUEUE.get(None) diff --git a/metagpt/report.py b/metagpt/report.py new file mode 100644 index 000000000..ee3c1697b --- /dev/null +++ b/metagpt/report.py @@ -0,0 +1,289 @@ +import asyncio +from enum import Enum +from pathlib import Path +from typing import Any, Callable, Literal, Optional, Union +from urllib.parse import unquote, urlparse +from uuid import UUID, uuid4 + +from aiohttp import ClientSession, UnixConnector +from playwright.async_api import Page as AsyncPage +from playwright.sync_api import Page as SyncPage +from pydantic import BaseModel, Field, PrivateAttr + +from metagpt.const import METAGPT_REPORTER_DEFAULT_URL +from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue + +try: + import requests_unixsocket as requests +except ImportError: + import requests + + +class BlockType(str, Enum): + """Enumeration for different types of blocks.""" + + TERMINAL = "Terminal" + TASK = "Task" + BROWSER = "Browser" + BROWSER_RT = "Browser-RT" + EDITOR = "Editor" + GALLERY = "Gallery" + NOTEBOOK = "Notebook" + DOCS = "Docs" + + +END_MARKER_NAME = "end_marker" +END_MARKER_VALUE = "\x18\x19\x1B\x18" + + +class ResourceReporter(BaseModel): + """Base class for resource reporting.""" + + block: BlockType = Field(description="The type of block that is reporting the resource") + uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") + is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") + enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") + callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") + _llm_task: Optional[asyncio.Task] = PrivateAttr(None) + + def report(self, value: Any, name: str): + """Synchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return self._report(value, name) + + async def async_report(self, value: Any, name: str): + """Asynchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return await self._async_report(value, name) + + @classmethod + def set_report_fn(cls, fn: Callable): + """Set the synchronous report function. + + Args: + fn: A callable function used for synchronous reporting. For example: + + >>> def _report(self, value: Any, name: str): + ... print(value, name) + + """ + cls._report = fn + + @classmethod + def set_async_report_fn(cls, fn: Callable): + """Set the asynchronous report function. + + Args: + fn: A callable function used for asynchronous reporting. For example: + + ```python + >>> async def _report(self, value: Any, name: str): + ... print(value, name) + ``` + """ + cls._report = fn + + def _report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + resp = requests.post(self.callback_url, json=data) + resp.raise_for_status() + return resp.text() + + async def _async_report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + url = self.callback_url + _result = urlparse(url) + sessiion_kwargs = {} + if "unix" in _result.scheme: + url = str(_result._replace(scheme="http", netloc="fake.org")) + sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc)) + + async with ClientSession(**sessiion_kwargs) as client: + async with client.post(url, json=data) as resp: + resp.raise_for_status() + return await resp.text() + + def _format_data(self, value, name): + data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream")) + data["value"] = value + data["name"] = name + return data + + def __enter__(self): + """Enter the synchronous streaming callback context.""" + self.is_chunk = True + return self + + def __exit__(self, *args, **kwargs): + """Exit the synchronous streaming callback context.""" + self.report(None, END_MARKER_NAME) + self.is_chunk = False + + async def __aenter__(self): + """Enter the asynchronous streaming callback context.""" + self.is_chunk = True + if self.enable_llm_stream: + queue = create_llm_stream_queue() + self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) + return self + + async def __aexit__(self, *args, **kwargs): + """Exit the asynchronous streaming callback context.""" + await self.async_report(None, END_MARKER_NAME) + self.is_chunk = False + if self.enable_llm_stream: + self._llm_task.cancel() + self._llm_task = None + + async def _llm_stream_report(self, queue: asyncio.Queue): + while True: + await self.async_report(await queue.get(), "content") + + async def wait_llm_stream_report(self): + """Wait for the LLM stream report to complete.""" + queue = get_llm_stream_queue() + while self._llm_task: + if queue.empty(): + break + await asyncio.sleep(0.01) + + +class TerminalReporter(ResourceReporter): + """Terminal output callback for streaming reporting of command and output. + + The terminal has state, and an agent can open multiple terminals and input different commands into them. + To correctly display these states, each terminal should have its own unique ID, so in practice, each terminal + should instantiate its own TerminalReporter object. + """ + + block: Literal[BlockType.TERMINAL] = BlockType.TERMINAL + + def report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output asynchronously.""" + return await super().async_report(value, name) + + +class BrowserReporter(ResourceReporter): + """Browser output callback for streaming reporting of requested URL and page content. + + The browser has state, so in practice, each browser should instantiate its own BrowserReporter object. + """ + + block: Literal[BlockType.BROWSER] = BlockType.BROWSER + + def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]): + """Report browser URL or page content synchronously.""" + if name == "page": + value = value.screenshot() + value = str(value) + return super().report(value, name) + + async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]): + """Report browser URL or page content asynchronously.""" + if name == "page": + value = await value.screenshot() + value = str(value) + return await super().async_report(value, name) + + +class ServerReporter(ResourceReporter): + """Callback for server deployment reporting.""" + + block: Literal[BlockType.BROWSER_RT] = BlockType.BROWSER_RT + + def report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment asynchronously.""" + return await super().async_report(value, name) + + +class ObjectReporter(ResourceReporter): + """Callback for reporting complete object resources.""" + + def report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource asynchronously.""" + return await super().async_report(value, name) + + +class TaskReporter(ObjectReporter): + """Reporter for object resources to Task Block.""" + + block: Literal[BlockType.TASK] = BlockType.TASK + + +class FileReporter(ResourceReporter): + """File resource callback for reporting complete file paths. + + There are two scenarios: if the file needs to be output in its entirety at once, use non-streaming callback; + if the file can be partially output for display first, use streaming callback. + """ + + def report(self, value: Union[Path, dict, Any], name: Literal["path", "meta", "content"] = "path"): + """Report file resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Path, name: Literal["path", "meta", "content"] = "path"): + """Report file resource asynchronously.""" + return await super().async_report(value, name) + + +class NotebookReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.NOTEBOOK).""" + + block: Literal[BlockType.NOTEBOOK] = BlockType.NOTEBOOK + + +class DocsReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.DOCS).""" + + block: Literal[BlockType.DOCS] = BlockType.DOCS + + +class EditorReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.Editor).""" + + block: Literal[BlockType.EDITOR] = BlockType.EDITOR + + +class GalleryReporter(FileReporter): + """Image resource callback for reporting complete file paths. + + Since images need to be complete before display, each callback is a complete file path. However, the Gallery + needs to display the type of image and prompt, so if there is meta information, it should be reported in a + streaming manner. + """ + + block: Literal[BlockType.GALLERY] = BlockType.GALLERY + + def report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource asynchronously.""" + return await super().async_report(value, name) diff --git a/tests/conftest.py b/tests/conftest.py index 8603c752a..f26ab2ef9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -247,14 +247,16 @@ def search_engine_mocker(aiohttp_mocker, curl_cffi_mocker, httplib2_mocker, sear @pytest.fixture def http_server(): - async def handler(request): - return aiohttp.web.Response( - text=""" - MetaGPT

MetaGPT

""", - content_type="text/html", - ) + async def start(handler=None): + if handler is None: + + async def handler(request): + return aiohttp.web.Response( + text=""" + MetaGPT

MetaGPT

""", + content_type="text/html", + ) - async def start(): server = aiohttp.web.Server(handler) runner = aiohttp.web.ServerRunner(server) await runner.setup() diff --git a/tests/metagpt/test_reporter.py b/tests/metagpt/test_reporter.py new file mode 100644 index 000000000..f7dadf0f0 --- /dev/null +++ b/tests/metagpt/test_reporter.py @@ -0,0 +1,182 @@ +import ast +from contextlib import asynccontextmanager + +import aiohttp.web +import pytest + +from metagpt.logs import log_llm_stream +from metagpt.report import ( + END_MARKER_NAME, + BlockType, + BrowserReporter, + DocsReporter, + EditorReporter, + NotebookReporter, + ServerReporter, + TaskReporter, + TerminalReporter, +) + + +class MockFileLLM: + def __init__(self, data: str): + self.data = data + + async def aask(self, *args, **kwargs) -> str: + for i in self.data.splitlines(keepends=True): + log_llm_stream(i) + log_llm_stream("\n") + return self.data + + +@asynccontextmanager +async def callback_server(http_server): + callback_data = [] + + async def handler(request): + callback_data.append(await request.json()) + return aiohttp.web.json_response({}) + + server, url = await http_server(handler) + yield url, callback_data + await server.stop() + + +@pytest.mark.asyncio +async def test_terminal_report(http_server): + async with callback_server(http_server) as (url, callback_data): + async with TerminalReporter(callback_url=url) as reporter: + await reporter.async_report("ls -a", "cmd") + await reporter.async_report("main.py\n", "output") + await reporter.async_report("setup.py\n", "output") + assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) + assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n" + + +@pytest.mark.asyncio +async def test_browser_report(http_server): + img = b"\x89PNG\r\n\x1a\n\x00\x00" + web_url = "https://docs.deepwisdom.ai" + + class AsyncPage: + async def screenshot(self): + return img + + async with callback_server(http_server) as (url, callback_data): + async with BrowserReporter(callback_url=url) as reporter: + await reporter.async_report(web_url, "url") + await reporter.async_report(AsyncPage(), "page") + + assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) + assert len(callback_data) == 3 + assert callback_data[-1]["name"] == END_MARKER_NAME + assert callback_data[0]["name"] == "url" + assert callback_data[0]["value"] == web_url + assert callback_data[1]["name"] == "page" + assert ast.literal_eval(callback_data[1]["value"]) == img + + +@pytest.mark.asyncio +async def test_server_reporter(http_server): + local_url = "http://127.0.0.1:8080/index.html" + async with callback_server(http_server) as (url, callback_data): + reporter = ServerReporter(callback_url=url) + await reporter.async_report(local_url) + assert all(BlockType.BROWSER_RT is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 1 + assert callback_data[0]["name"] == "local_url" + assert callback_data[0]["value"] == local_url + assert not callback_data[0]["is_chunk"] + + +@pytest.mark.asyncio +async def test_task_reporter(http_server): + task = {"current_task_id": "", "tasks": []} + async with callback_server(http_server) as (url, callback_data): + reporter = TaskReporter(callback_url=url) + await reporter.async_report(task) + + assert all(BlockType.TASK is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 1 + assert callback_data[0]["name"] == "object" + assert callback_data[0]["value"] == task + + +@pytest.mark.asyncio +async def test_notebook_reporter(http_server): + code = { + "cell_type": "code", + "execution_count": None, + "id": "e1841c44", + "metadata": {}, + "outputs": [], + "source": ["\n", "import time\n", "print('will sleep 1s.')\n", "time.sleep(1)\n", "print('end.')\n", ""], + } + output1 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]} + output2 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]} + code_path = "/data/main.ipynb" + async with callback_server(http_server) as (url, callback_data): + async with NotebookReporter(callback_url=url) as reporter: + await reporter.async_report(code, "content") + await reporter.async_report(output1, "content") + await reporter.async_report(output2, "content") + await reporter.async_report(code_path, "path") + + assert all(BlockType.NOTEBOOK is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 5 + assert callback_data[-1]["name"] == END_MARKER_NAME + assert callback_data[-2]["name"] == "path" + assert callback_data[-2]["value"] == code_path + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) + assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("data", "file_path", "meta", "block", "report_cls"), + ( + ( + "## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...", + "/data/prd.md", + {"type": "write_prd"}, + BlockType.DOCS, + DocsReporter, + ), + ( + "#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n", + "/data/main.py", + {"type": "write_code"}, + BlockType.EDITOR, + EditorReporter, + ), + ), + ids=["test_docs_reporter", "test_editor_reporter"], +) +async def test_llm_stream_reporter(data, file_path, meta, block, report_cls, http_server): + async with callback_server(http_server) as (url, callback_data): + async with report_cls(callback_url=url, enable_llm_stream=True) as reporter: + await reporter.async_report(meta, "meta") + await MockFileLLM(data).aask("") + await reporter.wait_llm_stream_report() + await reporter.async_report(file_path, "path") + assert callback_data + assert all(block is BlockType(i["block"]) for i in callback_data) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) + chunks, names = [], set() + for i in callback_data: + name = i["name"] + names.add(name) + if name == "meta": + assert i["value"] == meta + elif name == "path": + assert i["value"] == file_path + elif name == END_MARKER_NAME: + pass + elif name == "content": + chunks.append(i["value"]) + else: + raise ValueError + assert "".join(chunks[:-1]) == data + assert names == {"meta", "path", "content", END_MARKER_NAME}