Merge branch 'feature-reporter' into 'feat-stream'

Feature reporter

See merge request pub/MetaGPT!61
This commit is contained in:
洪炯腾 2024-04-20 12:47:43 +00:00
commit 63eb85ad61
5 changed files with 521 additions and 7 deletions

View file

@ -142,3 +142,6 @@ ASSISTANT_ALIAS = "response"
# Markdown
MARKDOWN_TITLE_PREFIX = "## "
# Reporter
METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "")

View file

@ -8,7 +8,9 @@
from __future__ import annotations
import asyncio
import sys
from contextvars import ContextVar
from datetime import datetime
from functools import partial
from typing import Any
@ -18,6 +20,8 @@ from pydantic import BaseModel, Field
from metagpt.const import METAGPT_ROOT
LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream")
class ToolLogItem(BaseModel):
type_: str = Field(alias="type", default="str", description="Data type of `value` field.")
@ -46,6 +50,20 @@ logger = define_log_level()
def log_llm_stream(msg):
"""
Logs a message to the LLM stream.
Args:
msg: The message to be logged.
Notes:
If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called),
the message will not be added to the LLM stream queue.
"""
queue = get_llm_stream_queue()
if queue:
queue.put_nowait(msg)
_llm_stream_log(msg)
@ -86,3 +104,23 @@ _tool_output_log = (
async def _tool_output_log_async(*args, **kwargs):
# async version
pass
def create_llm_stream_queue():
"""Creates a new LLM stream queue and sets it in the context variable.
Returns:
The newly created asyncio.Queue instance.
"""
queue = asyncio.Queue()
LLM_STREAM_QUEUE.set(queue)
return queue
def get_llm_stream_queue():
"""Retrieves the current LLM stream queue from the context variable.
Returns:
The asyncio.Queue instance if set, otherwise None.
"""
return LLM_STREAM_QUEUE.get(None)

289
metagpt/report.py Normal file
View file

@ -0,0 +1,289 @@
import asyncio
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Union
from urllib.parse import unquote, urlparse
from uuid import UUID, uuid4
from aiohttp import ClientSession, UnixConnector
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Page as SyncPage
from pydantic import BaseModel, Field, PrivateAttr
from metagpt.const import METAGPT_REPORTER_DEFAULT_URL
from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue
try:
import requests_unixsocket as requests
except ImportError:
import requests
class BlockType(str, Enum):
"""Enumeration for different types of blocks."""
TERMINAL = "Terminal"
TASK = "Task"
BROWSER = "Browser"
BROWSER_RT = "Browser-RT"
EDITOR = "Editor"
GALLERY = "Gallery"
NOTEBOOK = "Notebook"
DOCS = "Docs"
END_MARKER_NAME = "end_marker"
END_MARKER_VALUE = "\x18\x19\x1B\x18"
class ResourceReporter(BaseModel):
"""Base class for resource reporting."""
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
def report(self, value: Any, name: str):
"""Synchronously report resource observation data.
Args:
value: The data to report.
name: The type name of the data.
"""
return self._report(value, name)
async def async_report(self, value: Any, name: str):
"""Asynchronously report resource observation data.
Args:
value: The data to report.
name: The type name of the data.
"""
return await self._async_report(value, name)
@classmethod
def set_report_fn(cls, fn: Callable):
"""Set the synchronous report function.
Args:
fn: A callable function used for synchronous reporting. For example:
>>> def _report(self, value: Any, name: str):
... print(value, name)
"""
cls._report = fn
@classmethod
def set_async_report_fn(cls, fn: Callable):
"""Set the asynchronous report function.
Args:
fn: A callable function used for asynchronous reporting. For example:
```python
>>> async def _report(self, value: Any, name: str):
... print(value, name)
```
"""
cls._report = fn
def _report(self, value: Any, name: str):
if not self.callback_url:
return
data = self._format_data(value, name)
resp = requests.post(self.callback_url, json=data)
resp.raise_for_status()
return resp.text()
async def _async_report(self, value: Any, name: str):
if not self.callback_url:
return
data = self._format_data(value, name)
url = self.callback_url
_result = urlparse(url)
sessiion_kwargs = {}
if "unix" in _result.scheme:
url = str(_result._replace(scheme="http", netloc="fake.org"))
sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc))
async with ClientSession(**sessiion_kwargs) as client:
async with client.post(url, json=data) as resp:
resp.raise_for_status()
return await resp.text()
def _format_data(self, value, name):
data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream"))
data["value"] = value
data["name"] = name
return data
def __enter__(self):
"""Enter the synchronous streaming callback context."""
self.is_chunk = True
return self
def __exit__(self, *args, **kwargs):
"""Exit the synchronous streaming callback context."""
self.report(None, END_MARKER_NAME)
self.is_chunk = False
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
return self
async def __aexit__(self, *args, **kwargs):
"""Exit the asynchronous streaming callback context."""
await self.async_report(None, END_MARKER_NAME)
self.is_chunk = False
if self.enable_llm_stream:
self._llm_task.cancel()
self._llm_task = None
async def _llm_stream_report(self, queue: asyncio.Queue):
while True:
await self.async_report(await queue.get(), "content")
async def wait_llm_stream_report(self):
"""Wait for the LLM stream report to complete."""
queue = get_llm_stream_queue()
while self._llm_task:
if queue.empty():
break
await asyncio.sleep(0.01)
class TerminalReporter(ResourceReporter):
"""Terminal output callback for streaming reporting of command and output.
The terminal has state, and an agent can open multiple terminals and input different commands into them.
To correctly display these states, each terminal should have its own unique ID, so in practice, each terminal
should instantiate its own TerminalReporter object.
"""
block: Literal[BlockType.TERMINAL] = BlockType.TERMINAL
def report(self, value: str, name: Literal["cmd", "output"]):
"""Report terminal command or output synchronously."""
return super().report(value, name)
async def async_report(self, value: str, name: Literal["cmd", "output"]):
"""Report terminal command or output asynchronously."""
return await super().async_report(value, name)
class BrowserReporter(ResourceReporter):
"""Browser output callback for streaming reporting of requested URL and page content.
The browser has state, so in practice, each browser should instantiate its own BrowserReporter object.
"""
block: Literal[BlockType.BROWSER] = BlockType.BROWSER
def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]):
"""Report browser URL or page content synchronously."""
if name == "page":
value = value.screenshot()
value = str(value)
return super().report(value, name)
async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]):
"""Report browser URL or page content asynchronously."""
if name == "page":
value = await value.screenshot()
value = str(value)
return await super().async_report(value, name)
class ServerReporter(ResourceReporter):
"""Callback for server deployment reporting."""
block: Literal[BlockType.BROWSER_RT] = BlockType.BROWSER_RT
def report(self, value: str, name: Literal["local_url"] = "local_url"):
"""Report server deployment synchronously."""
return super().report(value, name)
async def async_report(self, value: str, name: Literal["local_url"] = "local_url"):
"""Report server deployment asynchronously."""
return await super().async_report(value, name)
class ObjectReporter(ResourceReporter):
"""Callback for reporting complete object resources."""
def report(self, value: dict, name: Literal["object"] = "object"):
"""Report object resource synchronously."""
return super().report(value, name)
async def async_report(self, value: dict, name: Literal["object"] = "object"):
"""Report object resource asynchronously."""
return await super().async_report(value, name)
class TaskReporter(ObjectReporter):
"""Reporter for object resources to Task Block."""
block: Literal[BlockType.TASK] = BlockType.TASK
class FileReporter(ResourceReporter):
"""File resource callback for reporting complete file paths.
There are two scenarios: if the file needs to be output in its entirety at once, use non-streaming callback;
if the file can be partially output for display first, use streaming callback.
"""
def report(self, value: Union[Path, dict, Any], name: Literal["path", "meta", "content"] = "path"):
"""Report file resource synchronously."""
return super().report(value, name)
async def async_report(self, value: Path, name: Literal["path", "meta", "content"] = "path"):
"""Report file resource asynchronously."""
return await super().async_report(value, name)
class NotebookReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.NOTEBOOK)."""
block: Literal[BlockType.NOTEBOOK] = BlockType.NOTEBOOK
class DocsReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.DOCS)."""
block: Literal[BlockType.DOCS] = BlockType.DOCS
class EditorReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.Editor)."""
block: Literal[BlockType.EDITOR] = BlockType.EDITOR
class GalleryReporter(FileReporter):
"""Image resource callback for reporting complete file paths.
Since images need to be complete before display, each callback is a complete file path. However, the Gallery
needs to display the type of image and prompt, so if there is meta information, it should be reported in a
streaming manner.
"""
block: Literal[BlockType.GALLERY] = BlockType.GALLERY
def report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"):
"""Report image resource synchronously."""
return super().report(value, name)
async def async_report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"):
"""Report image resource asynchronously."""
return await super().async_report(value, name)

View file

@ -247,14 +247,16 @@ def search_engine_mocker(aiohttp_mocker, curl_cffi_mocker, httplib2_mocker, sear
@pytest.fixture
def http_server():
async def handler(request):
return aiohttp.web.Response(
text="""<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">
<title>MetaGPT</title></head><body><h1>MetaGPT</h1></body></html>""",
content_type="text/html",
)
async def start(handler=None):
if handler is None:
async def handler(request):
return aiohttp.web.Response(
text="""<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">
<title>MetaGPT</title></head><body><h1>MetaGPT</h1></body></html>""",
content_type="text/html",
)
async def start():
server = aiohttp.web.Server(handler)
runner = aiohttp.web.ServerRunner(server)
await runner.setup()

View file

@ -0,0 +1,182 @@
import ast
from contextlib import asynccontextmanager
import aiohttp.web
import pytest
from metagpt.logs import log_llm_stream
from metagpt.report import (
END_MARKER_NAME,
BlockType,
BrowserReporter,
DocsReporter,
EditorReporter,
NotebookReporter,
ServerReporter,
TaskReporter,
TerminalReporter,
)
class MockFileLLM:
def __init__(self, data: str):
self.data = data
async def aask(self, *args, **kwargs) -> str:
for i in self.data.splitlines(keepends=True):
log_llm_stream(i)
log_llm_stream("\n")
return self.data
@asynccontextmanager
async def callback_server(http_server):
callback_data = []
async def handler(request):
callback_data.append(await request.json())
return aiohttp.web.json_response({})
server, url = await http_server(handler)
yield url, callback_data
await server.stop()
@pytest.mark.asyncio
async def test_terminal_report(http_server):
async with callback_server(http_server) as (url, callback_data):
async with TerminalReporter(callback_url=url) as reporter:
await reporter.async_report("ls -a", "cmd")
await reporter.async_report("main.py\n", "output")
await reporter.async_report("setup.py\n", "output")
assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n"
@pytest.mark.asyncio
async def test_browser_report(http_server):
img = b"\x89PNG\r\n\x1a\n\x00\x00"
web_url = "https://docs.deepwisdom.ai"
class AsyncPage:
async def screenshot(self):
return img
async with callback_server(http_server) as (url, callback_data):
async with BrowserReporter(callback_url=url) as reporter:
await reporter.async_report(web_url, "url")
await reporter.async_report(AsyncPage(), "page")
assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert len(callback_data) == 3
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[0]["name"] == "url"
assert callback_data[0]["value"] == web_url
assert callback_data[1]["name"] == "page"
assert ast.literal_eval(callback_data[1]["value"]) == img
@pytest.mark.asyncio
async def test_server_reporter(http_server):
local_url = "http://127.0.0.1:8080/index.html"
async with callback_server(http_server) as (url, callback_data):
reporter = ServerReporter(callback_url=url)
await reporter.async_report(local_url)
assert all(BlockType.BROWSER_RT is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 1
assert callback_data[0]["name"] == "local_url"
assert callback_data[0]["value"] == local_url
assert not callback_data[0]["is_chunk"]
@pytest.mark.asyncio
async def test_task_reporter(http_server):
task = {"current_task_id": "", "tasks": []}
async with callback_server(http_server) as (url, callback_data):
reporter = TaskReporter(callback_url=url)
await reporter.async_report(task)
assert all(BlockType.TASK is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 1
assert callback_data[0]["name"] == "object"
assert callback_data[0]["value"] == task
@pytest.mark.asyncio
async def test_notebook_reporter(http_server):
code = {
"cell_type": "code",
"execution_count": None,
"id": "e1841c44",
"metadata": {},
"outputs": [],
"source": ["\n", "import time\n", "print('will sleep 1s.')\n", "time.sleep(1)\n", "print('end.')\n", ""],
}
output1 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]}
output2 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]}
code_path = "/data/main.ipynb"
async with callback_server(http_server) as (url, callback_data):
async with NotebookReporter(callback_url=url) as reporter:
await reporter.async_report(code, "content")
await reporter.async_report(output1, "content")
await reporter.async_report(output2, "content")
await reporter.async_report(code_path, "path")
assert all(BlockType.NOTEBOOK is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 5
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[-2]["name"] == "path"
assert callback_data[-2]["value"] == code_path
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("data", "file_path", "meta", "block", "report_cls"),
(
(
"## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...",
"/data/prd.md",
{"type": "write_prd"},
BlockType.DOCS,
DocsReporter,
),
(
"#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n",
"/data/main.py",
{"type": "write_code"},
BlockType.EDITOR,
EditorReporter,
),
),
ids=["test_docs_reporter", "test_editor_reporter"],
)
async def test_llm_stream_reporter(data, file_path, meta, block, report_cls, http_server):
async with callback_server(http_server) as (url, callback_data):
async with report_cls(callback_url=url, enable_llm_stream=True) as reporter:
await reporter.async_report(meta, "meta")
await MockFileLLM(data).aask("")
await reporter.wait_llm_stream_report()
await reporter.async_report(file_path, "path")
assert callback_data
assert all(block is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
chunks, names = [], set()
for i in callback_data:
name = i["name"]
names.add(name)
if name == "meta":
assert i["value"] == meta
elif name == "path":
assert i["value"] == file_path
elif name == END_MARKER_NAME:
pass
elif name == "content":
chunks.append(i["value"])
else:
raise ValueError
assert "".join(chunks[:-1]) == data
assert names == {"meta", "path", "content", END_MARKER_NAME}