rename llm_stream to enable_llm_stream && fix some bugs

This commit is contained in:
shenchucheng 2024-04-20 20:33:56 +08:00
parent 8f7fc9347a
commit 6b77fbe6c5
3 changed files with 19 additions and 16 deletions

View file

@ -139,3 +139,5 @@ LLM_API_TIMEOUT = 300
# Assistant alias
ASSISTANT_ALIAS = "response"
METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "")

View file

@ -1,5 +1,4 @@
import asyncio
import os
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Union
@ -11,6 +10,7 @@ from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Page as SyncPage
from pydantic import BaseModel, Field, PrivateAttr
from metagpt.const import METAGPT_REPORTER_DEFAULT_URL
from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue
try:
@ -41,11 +41,9 @@ class ResourceReporter(BaseModel):
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
callback_url: str = Field(
os.environ.get("METAGPT_OBSERVER_CALLBACK_URL", ""), description="The URL to which the report should be sent"
)
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
def report(self, value: Any, name: str):
@ -138,7 +136,7 @@ class ResourceReporter(BaseModel):
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.llm_stream:
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
return self
@ -147,7 +145,7 @@ class ResourceReporter(BaseModel):
"""Exit the asynchronous streaming callback context."""
await self.async_report(None, END_MARKER_NAME)
self.is_chunk = False
if self.llm_stream:
if self.enable_llm_stream:
self._llm_task.cancel()
self._llm_task = None

View file

@ -10,6 +10,7 @@ from metagpt.report import (
BlockType,
BrowserReporter,
DocsReporter,
EditorReporter,
NotebookReporter,
ServerReporter,
TaskReporter,
@ -49,7 +50,7 @@ async def test_terminal_report(http_server):
await reporter.async_report("main.py\n", "output")
await reporter.async_report("setup.py\n", "output")
assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data)
assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:])
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n"
@ -68,7 +69,7 @@ async def test_browser_report(http_server):
await reporter.async_report(AsyncPage(), "page")
assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data)
assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:])
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert len(callback_data) == 3
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[0]["name"] == "url"
@ -128,39 +129,41 @@ async def test_notebook_reporter(http_server):
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[-2]["name"] == "path"
assert callback_data[-2]["value"] == code_path
assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:])
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("data", "file_path", "meta", "block"),
("data", "file_path", "meta", "block", "report_cls"),
(
(
"## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...",
"/data/prd.md",
{"type": "write_prd"},
BlockType.DOCS,
DocsReporter,
),
(
"#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n",
"/data/main.py",
{"type": "write_code"},
BlockType.DOCS,
BlockType.EDITOR,
EditorReporter,
),
),
ids=["test_docs_reporter", "test_editor_reporter"],
)
async def test_llm_stream_reporter(data, file_path, meta, block, http_server):
async def test_llm_stream_reporter(data, file_path, meta, block, report_cls, http_server):
async with callback_server(http_server) as (url, callback_data):
async with DocsReporter(callback_url=url, llm_stream=True) as reporter:
async with report_cls(callback_url=url, enable_llm_stream=True) as reporter:
await reporter.async_report(meta, "meta")
await MockFileLLM(data).aask("")
await reporter.wait_llm_stream_report()
await reporter.async_report(file_path, "path")
assert callback_data
assert all(BlockType.DOCS is BlockType(i["block"]) for i in callback_data)
assert all(i["uid"] == callback_data[0]["uid"] for i in callback_data[1:])
assert all(block is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
chunks, names = [], set()
for i in callback_data:
name = i["name"]