From c47395081205d3eafa43d123bc90951846425264 Mon Sep 17 00:00:00 2001 From: shenchucheng Date: Sat, 20 Apr 2024 20:08:33 +0800 Subject: [PATCH 1/3] add reporter --- metagpt/logs.py | 38 +++++ metagpt/report.py | 291 +++++++++++++++++++++++++++++++++ tests/conftest.py | 16 +- tests/metagpt/test_reporter.py | 179 ++++++++++++++++++++ 4 files changed, 517 insertions(+), 7 deletions(-) create mode 100644 metagpt/report.py create mode 100644 tests/metagpt/test_reporter.py diff --git a/metagpt/logs.py b/metagpt/logs.py index 9a82d7b1b..663b75318 100644 --- a/metagpt/logs.py +++ b/metagpt/logs.py @@ -8,7 +8,9 @@ from __future__ import annotations +import asyncio import sys +from contextvars import ContextVar from datetime import datetime from functools import partial from typing import Any @@ -18,6 +20,8 @@ from pydantic import BaseModel, Field from metagpt.const import METAGPT_ROOT +LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream") + class ToolLogItem(BaseModel): type_: str = Field(alias="type", default="str", description="Data type of `value` field.") @@ -46,6 +50,20 @@ logger = define_log_level() def log_llm_stream(msg): + """ + Logs a message to the LLM stream. + + Args: + msg: The message to be logged. + + Notes: + If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called), + the message will not be added to the LLM stream queue. + """ + + queue = get_llm_stream_queue() + if queue: + queue.put_nowait(msg) _llm_stream_log(msg) @@ -86,3 +104,23 @@ _tool_output_log = ( async def _tool_output_log_async(*args, **kwargs): # async version pass + + +def create_llm_stream_queue(): + """Creates a new LLM stream queue and sets it in the context variable. + + Returns: + The newly created asyncio.Queue instance. + """ + queue = asyncio.Queue() + LLM_STREAM_QUEUE.set(queue) + return queue + + +def get_llm_stream_queue(): + """Retrieves the current LLM stream queue from the context variable. + + Returns: + The asyncio.Queue instance if set, otherwise None. + """ + return LLM_STREAM_QUEUE.get(None) diff --git a/metagpt/report.py b/metagpt/report.py new file mode 100644 index 000000000..3fb49c89e --- /dev/null +++ b/metagpt/report.py @@ -0,0 +1,291 @@ +import asyncio +import os +from enum import Enum +from pathlib import Path +from typing import Any, Callable, Literal, Optional, Union +from urllib.parse import unquote, urlparse +from uuid import UUID, uuid4 + +from aiohttp import ClientSession, UnixConnector +from playwright.async_api import Page as AsyncPage +from playwright.sync_api import Page as SyncPage +from pydantic import BaseModel, Field, PrivateAttr + +from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue + +try: + import requests_unixsocket as requests +except ImportError: + import requests + + +class BlockType(str, Enum): + """Enumeration for different types of blocks.""" + + TERMINAL = "Terminal" + TASK = "Task" + BROWSER = "Browser" + BROWSER_RT = "Browser-RT" + EDITOR = "Editor" + GALLERY = "Gallery" + NOTEBOOK = "Notebook" + DOCS = "Docs" + + +END_MARKER_NAME = "end_marker" +END_MARKER_VALUE = "\x18\x19\x1B\x18" + + +class ResourceReporter(BaseModel): + """Base class for resource reporting.""" + + block: BlockType = Field(description="The type of block that is reporting the resource") + uid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") + callback_url: str = Field( + os.environ.get("METAGPT_OBSERVER_CALLBACK_URL", ""), description="The URL to which the report should be sent" + ) + is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") + llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") + _llm_task: Optional[asyncio.Task] = PrivateAttr(None) + + def report(self, value: Any, name: str): + """Synchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return self._report(value, name) + + async def async_report(self, value: Any, name: str): + """Asynchronously report resource observation data. + + Args: + value: The data to report. + name: The type name of the data. + """ + return await self._async_report(value, name) + + @classmethod + def set_report_fn(cls, fn: Callable): + """Set the synchronous report function. + + Args: + fn: A callable function used for synchronous reporting. For example: + + >>> def _report(self, value: Any, name: str): + ... print(value, name) + + """ + cls._report = fn + + @classmethod + def set_async_report_fn(cls, fn: Callable): + """Set the asynchronous report function. + + Args: + fn: A callable function used for asynchronous reporting. For example: + + ```python + >>> async def _report(self, value: Any, name: str): + ... print(value, name) + ``` + """ + cls._report = fn + + def _report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + resp = requests.post(self.callback_url, json=data) + resp.raise_for_status() + return resp.text() + + async def _async_report(self, value: Any, name: str): + if not self.callback_url: + return + + data = self._format_data(value, name) + url = self.callback_url + _result = urlparse(url) + sessiion_kwargs = {} + if "unix" in _result.scheme: + url = str(_result._replace(scheme="http", netloc="fake.org")) + sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc)) + + async with ClientSession(**sessiion_kwargs) as client: + async with client.post(url, json=data) as resp: + resp.raise_for_status() + return await resp.text() + + def _format_data(self, value, name): + data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream")) + data["value"] = value + data["name"] = name + return data + + def __enter__(self): + """Enter the synchronous streaming callback context.""" + self.is_chunk = True + return self + + def __exit__(self, *args, **kwargs): + """Exit the synchronous streaming callback context.""" + self.report(None, END_MARKER_NAME) + self.is_chunk = False + + async def __aenter__(self): + """Enter the asynchronous streaming callback context.""" + self.is_chunk = True + if self.llm_stream: + queue = create_llm_stream_queue() + self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) + return self + + async def __aexit__(self, *args, **kwargs): + """Exit the asynchronous streaming callback context.""" + await self.async_report(None, END_MARKER_NAME) + self.is_chunk = False + if self.llm_stream: + self._llm_task.cancel() + self._llm_task = None + + async def _llm_stream_report(self, queue: asyncio.Queue): + while True: + await self.async_report(await queue.get(), "content") + + async def wait_llm_stream_report(self): + """Wait for the LLM stream report to complete.""" + queue = get_llm_stream_queue() + while self._llm_task: + if queue.empty(): + break + await asyncio.sleep(0.01) + + +class TerminalReporter(ResourceReporter): + """Terminal output callback for streaming reporting of command and output. + + The terminal has state, and an agent can open multiple terminals and input different commands into them. + To correctly display these states, each terminal should have its own unique ID, so in practice, each terminal + should instantiate its own TerminalReporter object. + """ + + block: Literal[BlockType.TERMINAL] = BlockType.TERMINAL + + def report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["cmd", "output"]): + """Report terminal command or output asynchronously.""" + return await super().async_report(value, name) + + +class BrowserReporter(ResourceReporter): + """Browser output callback for streaming reporting of requested URL and page content. + + The browser has state, so in practice, each browser should instantiate its own BrowserReporter object. + """ + + block: Literal[BlockType.BROWSER] = BlockType.BROWSER + + def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]): + """Report browser URL or page content synchronously.""" + if name == "page": + value = value.screenshot() + value = str(value) + return super().report(value, name) + + async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]): + """Report browser URL or page content asynchronously.""" + if name == "page": + value = await value.screenshot() + value = str(value) + return await super().async_report(value, name) + + +class ServerReporter(ResourceReporter): + """Callback for server deployment reporting.""" + + block: Literal[BlockType.BROWSER_RT] = BlockType.BROWSER_RT + + def report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment synchronously.""" + return super().report(value, name) + + async def async_report(self, value: str, name: Literal["local_url"] = "local_url"): + """Report server deployment asynchronously.""" + return await super().async_report(value, name) + + +class ObjectReporter(ResourceReporter): + """Callback for reporting complete object resources.""" + + def report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: dict, name: Literal["object"] = "object"): + """Report object resource asynchronously.""" + return await super().async_report(value, name) + + +class TaskReporter(ObjectReporter): + """Reporter for object resources to Task Block.""" + + block: Literal[BlockType.TASK] = BlockType.TASK + + +class FileReporter(ResourceReporter): + """File resource callback for reporting complete file paths. + + There are two scenarios: if the file needs to be output in its entirety at once, use non-streaming callback; + if the file can be partially output for display first, use streaming callback. + """ + + def report(self, value: Union[Path, dict, Any], name: Literal["path", "meta", "content"] = "path"): + """Report file resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Path, name: Literal["path", "meta", "content"] = "path"): + """Report file resource asynchronously.""" + return await super().async_report(value, name) + + +class NotebookReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.NOTEBOOK).""" + + block: Literal[BlockType.NOTEBOOK] = BlockType.NOTEBOOK + + +class DocsReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.DOCS).""" + + block: Literal[BlockType.DOCS] = BlockType.DOCS + + +class EditorReporter(FileReporter): + """Equivalent to FileReporter(block=BlockType.Editor).""" + + block: Literal[BlockType.EDITOR] = BlockType.EDITOR + + +class GalleryReporter(FileReporter): + """Image resource callback for reporting complete file paths. + + Since images need to be complete before display, each callback is a complete file path. However, the Gallery + needs to display the type of image and prompt, so if there is meta information, it should be reported in a + streaming manner. + """ + + block: Literal[BlockType.GALLERY] = BlockType.GALLERY + + def report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource synchronously.""" + return super().report(value, name) + + async def async_report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"): + """Report image resource asynchronously.""" + return await super().async_report(value, name) diff --git a/tests/conftest.py b/tests/conftest.py index 8603c752a..f26ab2ef9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -247,14 +247,16 @@ def search_engine_mocker(aiohttp_mocker, curl_cffi_mocker, httplib2_mocker, sear @pytest.fixture def http_server(): - async def handler(request): - return aiohttp.web.Response( - text=""" - MetaGPT

MetaGPT

""", - content_type="text/html", - ) + async def start(handler=None): + if handler is None: + + async def handler(request): + return aiohttp.web.Response( + text=""" + MetaGPT

MetaGPT

""", + content_type="text/html", + ) - async def start(): server = aiohttp.web.Server(handler) runner = aiohttp.web.ServerRunner(server) await runner.setup() diff --git a/tests/metagpt/test_reporter.py b/tests/metagpt/test_reporter.py new file mode 100644 index 000000000..48ed975b8 --- /dev/null +++ b/tests/metagpt/test_reporter.py @@ -0,0 +1,179 @@ +import ast +from contextlib import asynccontextmanager + +import aiohttp.web +import pytest + +from metagpt.logs import log_llm_stream +from metagpt.report import ( + END_MARKER_NAME, + BlockType, + BrowserReporter, + DocsReporter, + NotebookReporter, + ServerReporter, + TaskReporter, + TerminalReporter, +) + + +class MockFileLLM: + def __init__(self, data: str): + self.data = data + + async def aask(self, *args, **kwargs) -> str: + for i in self.data.splitlines(keepends=True): + log_llm_stream(i) + log_llm_stream("\n") + return self.data + + +@asynccontextmanager +async def callback_server(http_server): + callback_data = [] + + async def handler(request): + callback_data.append(await request.json()) + return aiohttp.web.json_response({}) + + server, url = await http_server(handler) + yield url, callback_data + await server.stop() + + +@pytest.mark.asyncio +async def test_terminal_report(http_server): + async with callback_server(http_server) as (url, callback_data): + async with TerminalReporter(callback_url=url) as reporter: + await reporter.async_report("ls -a", "cmd") + await reporter.async_report("main.py\n", "output") + await reporter.async_report("setup.py\n", "output") + assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data) + assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n" + + +@pytest.mark.asyncio +async def test_browser_report(http_server): + img = b"\x89PNG\r\n\x1a\n\x00\x00" + web_url = "https://docs.deepwisdom.ai" + + class AsyncPage: + async def screenshot(self): + return img + + async with callback_server(http_server) as (url, callback_data): + async with BrowserReporter(callback_url=url) as reporter: + await reporter.async_report(web_url, "url") + await reporter.async_report(AsyncPage(), "page") + + assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data) + assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert len(callback_data) == 3 + assert callback_data[-1]["name"] == END_MARKER_NAME + assert callback_data[0]["name"] == "url" + assert callback_data[0]["value"] == web_url + assert callback_data[1]["name"] == "page" + assert ast.literal_eval(callback_data[1]["value"]) == img + + +@pytest.mark.asyncio +async def test_server_reporter(http_server): + local_url = "http://127.0.0.1:8080/index.html" + async with callback_server(http_server) as (url, callback_data): + reporter = ServerReporter(callback_url=url) + await reporter.async_report(local_url) + assert all(BlockType.BROWSER_RT is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 1 + assert callback_data[0]["name"] == "local_url" + assert callback_data[0]["value"] == local_url + assert not callback_data[0]["is_chunk"] + + +@pytest.mark.asyncio +async def test_task_reporter(http_server): + task = {"current_task_id": "", "tasks": []} + async with callback_server(http_server) as (url, callback_data): + reporter = TaskReporter(callback_url=url) + await reporter.async_report(task) + + assert all(BlockType.TASK is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 1 + assert callback_data[0]["name"] == "object" + assert callback_data[0]["value"] == task + + +@pytest.mark.asyncio +async def test_notebook_reporter(http_server): + code = { + "cell_type": "code", + "execution_count": None, + "id": "e1841c44", + "metadata": {}, + "outputs": [], + "source": ["\n", "import time\n", "print('will sleep 1s.')\n", "time.sleep(1)\n", "print('end.')\n", ""], + } + output1 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]} + output2 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]} + code_path = "/data/main.ipynb" + async with callback_server(http_server) as (url, callback_data): + async with NotebookReporter(callback_url=url) as reporter: + await reporter.async_report(code, "content") + await reporter.async_report(output1, "content") + await reporter.async_report(output2, "content") + await reporter.async_report(code_path, "path") + + assert all(BlockType.NOTEBOOK is BlockType(i["block"]) for i in callback_data) + assert len(callback_data) == 5 + assert callback_data[-1]["name"] == END_MARKER_NAME + assert callback_data[-2]["name"] == "path" + assert callback_data[-2]["value"] == code_path + assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("data", "file_path", "meta", "block"), + ( + ( + "## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...", + "/data/prd.md", + {"type": "write_prd"}, + BlockType.DOCS, + ), + ( + "#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n", + "/data/main.py", + {"type": "write_code"}, + BlockType.DOCS, + ), + ), + ids=["test_docs_reporter", "test_editor_reporter"], +) +async def test_llm_stream_reporter(data, file_path, meta, block, http_server): + async with callback_server(http_server) as (url, callback_data): + async with DocsReporter(callback_url=url, llm_stream=True) as reporter: + await reporter.async_report(meta, "meta") + await MockFileLLM(data).aask("") + await reporter.wait_llm_stream_report() + await reporter.async_report(file_path, "path") + assert callback_data + assert all(BlockType.DOCS is BlockType(i["block"]) for i in callback_data) + assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + chunks, names = [], set() + for i in callback_data: + name = i["name"] + names.add(name) + if name == "meta": + assert i["value"] == meta + elif name == "path": + assert i["value"] == file_path + elif name == END_MARKER_NAME: + pass + elif name == "content": + chunks.append(i["value"]) + else: + raise ValueError + assert "".join(chunks[:-1]) == data + assert names == {"meta", "path", "content", END_MARKER_NAME} From 8f7fc9347af2f5d8041e3c31f3fe7d3ee93b24fd Mon Sep 17 00:00:00 2001 From: shenchucheng Date: Sat, 20 Apr 2024 20:10:22 +0800 Subject: [PATCH 2/3] rename uid to uuid --- metagpt/report.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metagpt/report.py b/metagpt/report.py index 3fb49c89e..e821e353a 100644 --- a/metagpt/report.py +++ b/metagpt/report.py @@ -40,7 +40,7 @@ class ResourceReporter(BaseModel): """Base class for resource reporting.""" block: BlockType = Field(description="The type of block that is reporting the resource") - uid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") + uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") callback_url: str = Field( os.environ.get("METAGPT_OBSERVER_CALLBACK_URL", ""), description="The URL to which the report should be sent" ) From 6b77fbe6c54316fb07534dda03dddfb4d2322904 Mon Sep 17 00:00:00 2001 From: shenchucheng Date: Sat, 20 Apr 2024 20:33:56 +0800 Subject: [PATCH 3/3] rename llm_stream to enable_llm_stream && fix some bugs --- metagpt/const.py | 2 ++ metagpt/report.py | 12 +++++------- tests/metagpt/test_reporter.py | 21 ++++++++++++--------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/metagpt/const.py b/metagpt/const.py index c01f92adc..213cd4f1e 100644 --- a/metagpt/const.py +++ b/metagpt/const.py @@ -139,3 +139,5 @@ LLM_API_TIMEOUT = 300 # Assistant alias ASSISTANT_ALIAS = "response" + +METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "") diff --git a/metagpt/report.py b/metagpt/report.py index e821e353a..ee3c1697b 100644 --- a/metagpt/report.py +++ b/metagpt/report.py @@ -1,5 +1,4 @@ import asyncio -import os from enum import Enum from pathlib import Path from typing import Any, Callable, Literal, Optional, Union @@ -11,6 +10,7 @@ from playwright.async_api import Page as AsyncPage from playwright.sync_api import Page as SyncPage from pydantic import BaseModel, Field, PrivateAttr +from metagpt.const import METAGPT_REPORTER_DEFAULT_URL from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue try: @@ -41,11 +41,9 @@ class ResourceReporter(BaseModel): block: BlockType = Field(description="The type of block that is reporting the resource") uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") - callback_url: str = Field( - os.environ.get("METAGPT_OBSERVER_CALLBACK_URL", ""), description="The URL to which the report should be sent" - ) is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") - llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") + enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") + callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") _llm_task: Optional[asyncio.Task] = PrivateAttr(None) def report(self, value: Any, name: str): @@ -138,7 +136,7 @@ class ResourceReporter(BaseModel): async def __aenter__(self): """Enter the asynchronous streaming callback context.""" self.is_chunk = True - if self.llm_stream: + if self.enable_llm_stream: queue = create_llm_stream_queue() self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) return self @@ -147,7 +145,7 @@ class ResourceReporter(BaseModel): """Exit the asynchronous streaming callback context.""" await self.async_report(None, END_MARKER_NAME) self.is_chunk = False - if self.llm_stream: + if self.enable_llm_stream: self._llm_task.cancel() self._llm_task = None diff --git a/tests/metagpt/test_reporter.py b/tests/metagpt/test_reporter.py index 48ed975b8..f7dadf0f0 100644 --- a/tests/metagpt/test_reporter.py +++ b/tests/metagpt/test_reporter.py @@ -10,6 +10,7 @@ from metagpt.report import ( BlockType, BrowserReporter, DocsReporter, + EditorReporter, NotebookReporter, ServerReporter, TaskReporter, @@ -49,7 +50,7 @@ async def test_terminal_report(http_server): await reporter.async_report("main.py\n", "output") await reporter.async_report("setup.py\n", "output") assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data) - assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n" @@ -68,7 +69,7 @@ async def test_browser_report(http_server): await reporter.async_report(AsyncPage(), "page") assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data) - assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) assert len(callback_data) == 3 assert callback_data[-1]["name"] == END_MARKER_NAME assert callback_data[0]["name"] == "url" @@ -128,39 +129,41 @@ async def test_notebook_reporter(http_server): assert callback_data[-1]["name"] == END_MARKER_NAME assert callback_data[-2]["name"] == "path" assert callback_data[-2]["value"] == code_path - assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2] @pytest.mark.asyncio @pytest.mark.parametrize( - ("data", "file_path", "meta", "block"), + ("data", "file_path", "meta", "block", "report_cls"), ( ( "## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...", "/data/prd.md", {"type": "write_prd"}, BlockType.DOCS, + DocsReporter, ), ( "#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n", "/data/main.py", {"type": "write_code"}, - BlockType.DOCS, + BlockType.EDITOR, + EditorReporter, ), ), ids=["test_docs_reporter", "test_editor_reporter"], ) -async def test_llm_stream_reporter(data, file_path, meta, block, http_server): +async def test_llm_stream_reporter(data, file_path, meta, block, report_cls, http_server): async with callback_server(http_server) as (url, callback_data): - async with DocsReporter(callback_url=url, llm_stream=True) as reporter: + async with report_cls(callback_url=url, enable_llm_stream=True) as reporter: await reporter.async_report(meta, "meta") await MockFileLLM(data).aask("") await reporter.wait_llm_stream_report() await reporter.async_report(file_path, "path") assert callback_data - assert all(BlockType.DOCS is BlockType(i["block"]) for i in callback_data) - assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:]) + assert all(block is BlockType(i["block"]) for i in callback_data) + assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:]) chunks, names = [], set() for i in callback_data: name = i["name"]