Merge branch 'feat-stream' into 'mgx_ops'

Use stream in Editor and Notebook

See merge request pub/MetaGPT!56
This commit is contained in:
林义章 2024-05-06 03:37:28 +00:00
commit f02f371af5
22 changed files with 736 additions and 158 deletions

View file

@ -2,7 +2,6 @@ import asyncio
from metagpt.roles.di.data_interpreter import DataInterpreter
USE_GOT_REPO_REQ = """
Write a service using Flask, create a conda environment and run it, and call the service's interface for validation.
Notice: Don't write all codes in one response, each time, just write code for one step.

View file

@ -17,7 +17,7 @@ from pydantic import BaseModel, Field, create_model, model_validator
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action_outcls_registry import register_action_outcls
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.const import MARKDOWN_TITLE_PREFIX, USE_CONFIG_TIMEOUT
from metagpt.llm import BaseLLM
from metagpt.logs import logger
from metagpt.provider.postprocess.llm_output_postprocess import llm_output_postprocess
@ -113,7 +113,7 @@ Follow format example's {prompt_schema} format, generate output and make sure it
"""
def dict_to_markdown(d, prefix="- ", kv_sep="\n", postfix="\n"):
def dict_to_markdown(d, prefix=MARKDOWN_TITLE_PREFIX, kv_sep="\n", postfix="\n"):
markdown_str = ""
for key, value in d.items():
markdown_str += f"{prefix}{key}{kv_sep}{value}{postfix}"

View file

@ -26,6 +26,7 @@ from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO
from metagpt.logs import logger
from metagpt.schema import Document, Documents, Message
from metagpt.utils.mermaid import mermaid_to_file
from metagpt.utils.report import DocsReporter
NEW_REQ_TEMPLATE = """
### Legacy Content
@ -70,31 +71,34 @@ class WriteDesign(Action):
return ActionOutput(content=changed_files.model_dump_json(), instruct_content=changed_files)
async def _new_system_design(self, context):
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm)
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema)
return node
async def _merge(self, prd_doc, system_design_doc):
context = NEW_REQ_TEMPLATE.format(old_design=system_design_doc.content, context=prd_doc.content)
node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm)
node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema)
system_design_doc.content = node.instruct_content.model_dump_json()
return system_design_doc
async def _update_system_design(self, filename) -> Document:
prd = await self.repo.docs.prd.get(filename)
old_system_design_doc = await self.repo.docs.system_design.get(filename)
if not old_system_design_doc:
system_design = await self._new_system_design(context=prd.content)
doc = await self.repo.docs.system_design.save(
filename=filename,
content=system_design.instruct_content.model_dump_json(),
dependencies={prd.root_relative_path},
)
else:
doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)
await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})
await self._save_data_api_design(doc)
await self._save_seq_flow(doc)
await self.repo.resources.system_design.save_pdf(doc=doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "design"}, "meta")
if not old_system_design_doc:
system_design = await self._new_system_design(context=prd.content)
doc = await self.repo.docs.system_design.save(
filename=filename,
content=system_design.instruct_content.model_dump_json(),
dependencies={prd.root_relative_path},
)
else:
doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)
await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})
await self._save_data_api_design(doc)
await self._save_seq_flow(doc)
md = await self.repo.resources.system_design.save_pdf(doc=doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return doc
async def _save_data_api_design(self, design_doc):

View file

@ -13,9 +13,10 @@ from typing import Literal, Tuple
import nbformat
from nbclient import NotebookClient
from nbclient.exceptions import CellTimeoutError, DeadKernelError
from nbclient.exceptions import CellExecutionComplete, CellTimeoutError, DeadKernelError
from nbclient.util import ensure_async
from nbformat import NotebookNode
from nbformat.v4 import new_code_cell, new_markdown_cell, new_output
from nbformat.v4 import new_code_cell, new_markdown_cell, new_output, output_from_msg
from rich.box import MINIMAL
from rich.console import Console, Group
from rich.live import Live
@ -25,32 +26,64 @@ from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import ToolLogItem, log_tool_output, logger
from metagpt.logs import logger
from metagpt.utils.report import NotebookReporter
INSTALL_KEEPLEN = 500
class RealtimeOutputNotebookClient(NotebookClient):
"""Realtime output of Notebook execution."""
def __init__(self, *args, notebook_reporter=None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.notebook_reporter = notebook_reporter or NotebookReporter()
async def _async_poll_output_msg(self, parent_msg_id: str, cell: NotebookNode, cell_index: int) -> None:
"""Implement a feature to enable sending messages."""
assert self.kc is not None
while True:
msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None))
await self._send_msg(msg)
if msg["parent_header"].get("msg_id") == parent_msg_id:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
return
async def _send_msg(self, msg: dict):
msg_type = msg.get("header", {}).get("msg_type")
if msg_type not in ["stream", "error", "execute_result"]:
return
await self.notebook_reporter.async_report(output_from_msg(msg), "content")
class ExecuteNbCode(Action):
"""execute notebook code block, return result to llm, and display it."""
nb: NotebookNode
nb_client: NotebookClient
nb_client: NotebookClient = None
console: Console
interaction: str
timeout: int = 600
def __init__(
self,
nb=nbformat.v4.new_notebook(),
timeout=600,
):
def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600):
super().__init__(
nb=nb,
nb_client=NotebookClient(nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}),
timeout=timeout,
console=Console(),
interaction=("ipython" if self.is_ipython() else "terminal"),
)
self.reporter = NotebookReporter()
self.nb_client = RealtimeOutputNotebookClient(
nb,
timeout=timeout,
resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}},
notebook_reporter=self.reporter,
)
async def build(self):
if self.nb_client.kc is None or not await self.nb_client.kc.is_alive():
@ -175,6 +208,8 @@ class ExecuteNbCode(Action):
"""set timeout for run code.
returns the success or failure of the cell execution, and an optional error message.
"""
await self.reporter.async_report(cell, "content")
try:
await self.nb_client.async_execute_cell(cell, cell_index)
return self.parse_outputs(self.nb.cells[-1].outputs)
@ -196,35 +231,36 @@ class ExecuteNbCode(Action):
"""
self._display(code, language)
if language == "python":
# add code to the notebook
self.add_code_cell(code=code)
async with self.reporter:
if language == "python":
# add code to the notebook
self.add_code_cell(code=code)
# build code executor
await self.build()
# build code executor
await self.build()
# run code
cell_index = len(self.nb.cells) - 1
success, outputs = await self.run_cell(self.nb.cells[-1], cell_index)
# run code
cell_index = len(self.nb.cells) - 1
success, outputs = await self.run_cell(self.nb.cells[-1], cell_index)
if "!pip" in code:
success = False
outputs = outputs[-INSTALL_KEEPLEN:]
if "!pip" in code:
success = False
outputs = outputs[-INSTALL_KEEPLEN:]
elif language == "markdown":
# add markdown content to markdown cell in a notebook.
self.add_markdown_cell(code)
# return True, beacuse there is no execution failure for markdown cell.
outputs, success = code, True
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
file_path = DEFAULT_WORKSPACE_ROOT / "code.ipynb"
nbformat.write(self.nb, file_path)
log_tool_output(ToolLogItem(name="file_path", value=file_path), tool_name="ExecuteNbCode")
await self.reporter.async_report(file_path, "path")
return outputs, success
elif language == "markdown":
# add markdown content to markdown cell in a notebook.
self.add_markdown_cell(code)
# return True, beacuse there is no execution failure for markdown cell.
return code, True
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
def remove_escape_and_color_codes(input_str: str):
# 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码

View file

@ -19,6 +19,7 @@ from metagpt.actions.project_management_an import PM_NODE, REFINED_PM_NODE
from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME
from metagpt.logs import logger
from metagpt.schema import Document, Documents
from metagpt.utils.report import DocsReporter
NEW_REQ_TEMPLATE = """
### Legacy Content
@ -59,18 +60,21 @@ class WriteTasks(Action):
async def _update_tasks(self, filename):
system_design_doc = await self.repo.docs.system_design.get(filename)
task_doc = await self.repo.docs.task.get(filename)
if task_doc:
task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)
await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path})
else:
rsp = await self._run_new_tasks(context=system_design_doc.content)
task_doc = await self.repo.docs.task.save(
filename=filename,
content=rsp.instruct_content.model_dump_json(),
dependencies={system_design_doc.root_relative_path},
)
await self._update_requirements(task_doc)
await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "task"}, "meta")
if task_doc:
task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)
await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path})
else:
rsp = await self._run_new_tasks(context=system_design_doc.content)
task_doc = await self.repo.docs.task.save(
filename=filename,
content=rsp.instruct_content.model_dump_json(),
dependencies={system_design_doc.root_relative_path},
)
await self._update_requirements(task_doc)
md = await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return task_doc
async def _run_new_tasks(self, context):

View file

@ -28,6 +28,7 @@ from metagpt.logs import logger
from metagpt.schema import CodingContext, Document, RunCodeResult
from metagpt.utils.common import CodeParser
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.report import EditorReporter
PROMPT_TEMPLATE = """
NOTICE
@ -139,12 +140,15 @@ class WriteCode(Action):
summary_log=summary_doc.content if summary_doc else "",
)
logger.info(f"Writing {coding_context.filename}..")
code = await self.write_code(prompt)
if not coding_context.code_doc:
# avoid root_path pydantic ValidationError if use WriteCode alone
root_path = self.context.src_workspace if self.context.src_workspace else ""
coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path))
coding_context.code_doc.content = code
async with EditorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"filename": coding_context.filename}, "meta")
code = await self.write_code(prompt)
if not coding_context.code_doc:
# avoid root_path pydantic ValidationError if use WriteCode alone
root_path = self.context.src_workspace if self.context.src_workspace else ""
coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path))
coding_context.code_doc.content = code
await reporter.async_report(self.repo.workdir / coding_context.code_doc.root_relative_path, "path")
return coding_context
@staticmethod

View file

@ -37,6 +37,7 @@ from metagpt.schema import BugFixContext, Document, Documents, Message
from metagpt.utils.common import CodeParser
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.mermaid import mermaid_to_file
from metagpt.utils.report import DocsReporter
CONTEXT_TEMPLATE = """
### Project Name
@ -102,17 +103,22 @@ class WritePRD(Action):
async def _handle_new_requirement(self, req: Document) -> ActionOutput:
"""handle new requirement"""
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm, exclude=exclude) # schema=schema
await self._rename_workspace(node)
new_prd_doc = await self.repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json()
)
await self._save_competitive_analysis(new_prd_doc)
await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
return Documents.from_iterable(documents=[new_prd_doc]).to_action_output()
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "prd"}, "meta")
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
node = await WRITE_PRD_NODE.fill(
context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema
) # schema=schema
await self._rename_workspace(node)
new_prd_doc = await self.repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json()
)
await self._save_competitive_analysis(new_prd_doc)
md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return Documents.from_iterable(documents=[new_prd_doc]).to_action_output()
async def _handle_requirement_update(self, req: Document, related_docs: list[Document]) -> ActionOutput:
# ... requirement update logic ...
@ -146,10 +152,13 @@ class WritePRD(Action):
return related_doc
async def _update_prd(self, req: Document, prd_doc: Document) -> Document:
new_prd_doc: Document = await self._merge(req, prd_doc)
await self.repo.docs.prd.save_doc(doc=new_prd_doc)
await self._save_competitive_analysis(new_prd_doc)
await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "prd"}, "meta")
new_prd_doc: Document = await self._merge(req, prd_doc)
await self.repo.docs.prd.save_doc(doc=new_prd_doc)
await self._save_competitive_analysis(new_prd_doc)
md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return new_prd_doc
async def _save_competitive_analysis(self, prd_doc: Document):

View file

@ -165,7 +165,7 @@ ANYTHING_UNCLEAR = ActionNode(
key="Anything UNCLEAR",
expected_type=str,
instruction="Mention any aspects of the project that are unclear and try to clarify them.",
example="",
example="Currently, all aspects of the project are clear.",
)
ISSUE_TYPE = ActionNode(

View file

@ -14,6 +14,6 @@ class RoleCustomConfig(YamlModel):
role: role's className or role's role_id
To be expanded
"""
role: str = ""
llm: LLMConfig

View file

@ -140,5 +140,11 @@ LLM_API_TIMEOUT = 300
# Assistant alias
ASSISTANT_ALIAS = "response"
# Markdown
MARKDOWN_TITLE_PREFIX = "## "
# Reporter
METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "")
# Metadata defines
AGENT = "agent"

View file

@ -8,8 +8,10 @@
from __future__ import annotations
import asyncio
import inspect
import sys
from contextvars import ContextVar
from datetime import datetime
from functools import partial
from typing import Any
@ -19,6 +21,8 @@ from pydantic import BaseModel, Field
from metagpt.const import METAGPT_ROOT
LLM_STREAM_QUEUE: ContextVar[asyncio.Queue] = ContextVar("llm-stream")
class ToolLogItem(BaseModel):
type_: str = Field(alias="type", default="str", description="Data type of `value` field.")
@ -47,6 +51,20 @@ logger = define_log_level()
def log_llm_stream(msg):
"""
Logs a message to the LLM stream.
Args:
msg: The message to be logged.
Notes:
If the LLM_STREAM_QUEUE has not been set (e.g., if `create_llm_stream_queue` has not been called),
the message will not be added to the LLM stream queue.
"""
queue = get_llm_stream_queue()
if queue:
queue.put_nowait(msg)
_llm_stream_log(msg)
@ -102,4 +120,24 @@ async def _tool_output_log_async(*args, **kwargs):
pass
def create_llm_stream_queue():
"""Creates a new LLM stream queue and sets it in the context variable.
Returns:
The newly created asyncio.Queue instance.
"""
queue = asyncio.Queue()
LLM_STREAM_QUEUE.set(queue)
return queue
def get_llm_stream_queue():
"""Retrieves the current LLM stream queue from the context variable.
Returns:
The asyncio.Queue instance if set, otherwise None.
"""
return LLM_STREAM_QUEUE.get(None)
_get_human_input = input # get human input from console by default

View file

@ -47,10 +47,11 @@ from metagpt.const import (
SYSTEM_DESIGN_FILE_REPO,
TASK_FILE_REPO,
)
from metagpt.logs import ToolLogItem, log_tool_output, logger
from metagpt.logs import logger
from metagpt.repo_parser import DotClassInfo
from metagpt.utils.common import CodeParser, any_to_str, any_to_str_set, import_class
from metagpt.utils.exceptions import handle_exception
from metagpt.utils.report import TaskReporter
from metagpt.utils.serialize import (
actionoutout_schema_to_mapping,
actionoutput_mapping_to_str,
@ -578,14 +579,7 @@ class Plan(BaseModel):
current_task_id = task.task_id
break
self.current_task_id = current_task_id
log_tool_output(
[
ToolLogItem(type="object", name="tasks", value=self.tasks),
ToolLogItem(type="object", name="current_task_id", value=self.current_task_id),
],
tool_name="Plan",
)
TaskReporter().report({"tasks": self.tasks, "current_task_id": current_task_id})
@property
def current_task(self) -> Task:

View file

@ -8,7 +8,6 @@ import typer
from metagpt.const import CONFIG_ROOT
from metagpt.utils.common import any_to_str
from metagpt.utils.project_repo import ProjectRepo
app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False)
@ -26,7 +25,7 @@ def generate_repo(
reqa_file="",
max_auto_summarize_code=0,
recover_path=None,
) -> ProjectRepo:
):
"""Run the startup logic. Can be called from CLI or other Python scripts."""
from metagpt.config2 import config
from metagpt.context import Context

View file

@ -1,8 +1,8 @@
from playwright.async_api import async_playwright
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import ToolLogItem, log_tool_output_async
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import BrowserReporter
@register_tool()
@ -20,6 +20,7 @@ class Browser:
self.pages = {}
self.current_page_url = None
self.current_page = None
self.reporter = BrowserReporter()
async def start(self):
"""Starts Playwright and launches a browser"""
@ -34,21 +35,19 @@ class Browser:
async def open_new_page(self, url: str):
"""open a new page in the browser and view the page"""
page = await self.browser.new_page()
await page.goto(url)
self.pages[url] = page
await self._set_current_page(page, url)
await log_tool_output_async(
ToolLogItem(type="object", name="open_new_page", value=self.current_page), tool_name="Browser"
)
async with self.reporter as reporter:
page = await self.browser.new_page()
await reporter.async_report(url, "url")
await page.goto(url)
self.pages[url] = page
await self._set_current_page(page, url)
await reporter.async_report(page, "page")
async def switch_page(self, url: str):
"""switch to an opened page in the browser and view the page"""
if url in self.pages:
await self._set_current_page(self.pages[url], url)
await log_tool_output_async(
ToolLogItem(type="object", name="switch_page", value=self.current_page), tool_name="Browser"
)
await self.reporter.async_report(self.current_page, "page")
else:
print(f"Page not found: {url}")
@ -110,9 +109,8 @@ class Browser:
index = len(search_results) - 1
element = search_results[index]["element_obj"]
await element.scroll_into_view_if_needed()
await log_tool_output_async(
ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser"
)
await self.reporter.async_report(self.current_page, "page")
print(f"Successfully scrolled to the {index}-th search result")
print(await self._view())
@ -152,9 +150,8 @@ class Browser:
async def scroll_current_page(self, offset: int = 500):
"""scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling"""
await self.current_page.evaluate(f"window.scrollBy(0, {offset})")
await log_tool_output_async(
ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser"
)
await self.reporter.async_report(self.current_page, "page")
print(f"Scrolled current page by {offset} pixels.")
print(await self._view())

View file

@ -1,5 +1,5 @@
from metagpt.logs import ToolLogItem, log_tool_output
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import ServerReporter
# An un-implemented tool reserved for deploying a local service to public
@ -8,4 +8,4 @@ class Deployer:
"""Deploy a local service to public. Used only for final deployment, you should NOT use it for development and testing."""
def deploy_to_public(self, local_url: str):
log_tool_output(ToolLogItem(name="local_url", value=local_url), tool_name="Deployer")
ServerReporter().report(local_url, "local_url")

View file

@ -4,8 +4,8 @@ import subprocess
from pydantic import BaseModel, Field
from metagpt.logs import ToolLogItem, log_tool_output
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import EditorReporter
class FileBlock(BaseModel):
@ -23,17 +23,20 @@ class FileBlock(BaseModel):
class FileManager:
"""A tool for reading, understanding, writing, and editing files"""
def __init__(self) -> None:
self.resource = EditorReporter()
def write(self, path: str, content: str):
"""Write the whole content to a file."""
with open(path, "w") as f:
f.write(content)
log_tool_output(ToolLogItem(name="write_file_path", value=path), tool_name="FileManager")
self.resource.report(path, "path")
def read(self, path: str) -> str:
"""Read the whole content of a file."""
with open(path, "r") as f:
self.resource.report(path, "path")
return f.read()
log_tool_output(ToolLogItem(name="read_file_path", value=path), tool_name="FileManager")
def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock:
"""
@ -78,10 +81,7 @@ class FileManager:
symbol=symbol,
symbol_line=i + 1,
)
log_tool_output(
ToolLogItem(type="object", name="file_block_searched", value=result),
tool_name="FileManager",
)
self.resource.report(result.file_path, "path")
return result
return None
@ -124,9 +124,7 @@ class FileManager:
block_start_line=start_line,
block_end_line=-1 if end_line < start_line else start_line + new_block_content.count("\n"),
)
log_tool_output(
ToolLogItem(type="object", name="file_block_written", value=new_file_block), tool_name="FileManager"
)
self.resource.report(new_file_block.file_path, "path")
return f"Content written successfully to {file_path}"

View file

@ -2,8 +2,8 @@ import subprocess
import threading
from queue import Queue
from metagpt.logs import TOOL_LOG_END_MARKER, ToolLogItem, log_tool_output
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.report import END_MARKER_VALUE, TerminalReporter
@register_tool()
@ -28,9 +28,10 @@ class Terminal:
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # Line buffered
executable="/bin/bash"
executable="/bin/bash",
)
self.stdout_queue = Queue()
self.observer = TerminalReporter()
def run_command(self, cmd: str, daemon=False) -> str:
"""
@ -59,7 +60,7 @@ class Terminal:
# Send the command
self.process.stdin.write(cmd + self.command_terminator)
self.process.stdin.write(
f'echo "{TOOL_LOG_END_MARKER.value}"' + self.command_terminator # write EOF
f'echo "{END_MARKER_VALUE}"' + self.command_terminator # write EOF
) # Unique marker to signal command end
self.process.stdin.flush()
if daemon:
@ -93,28 +94,26 @@ class Terminal:
return self.run_command(cmd, daemon=daemon)
def _read_and_process_output(self, cmd):
cmd_output = []
log_tool_output(
output=ToolLogItem(name="cmd", value=cmd + self.command_terminator), tool_name="Terminal"
) # log the command
with self.observer as observer:
cmd_output = []
observer.report(cmd + self.command_terminator, "cmd")
# report the command
# Read the output until the unique marker is found
while True:
line = self.process.stdout.readline()
ix = line.rfind(TOOL_LOG_END_MARKER.value)
if ix >= 0:
line = line[0:ix]
if line:
log_tool_output(
output=ToolLogItem(name="output", value=line), tool_name="Terminal"
) # log stdout in real-time
cmd_output.append(line)
log_tool_output(TOOL_LOG_END_MARKER)
break
# log stdout in real-time
log_tool_output(output=ToolLogItem(name="output", value=line), tool_name="Terminal")
cmd_output.append(line)
self.stdout_queue.put(line)
# Read the output until the unique marker is found
while True:
line = self.process.stdout.readline()
ix = line.rfind(END_MARKER_VALUE)
if ix >= 0:
line = line[0:ix]
if line:
observer.report(line, "output")
# report stdout in real-time
cmd_output.append(line)
break
# log stdout in real-time
observer.report(line, "output")
cmd_output.append(line)
self.stdout_queue.put(line)
return "".join(cmd_output)

View file

@ -37,7 +37,7 @@ from PIL import Image
from pydantic_core import to_jsonable_python
from tenacity import RetryCallState, RetryError, _utils
from metagpt.const import MESSAGE_ROUTE_TO_ALL
from metagpt.const import MARKDOWN_TITLE_PREFIX, MESSAGE_ROUTE_TO_ALL
from metagpt.logs import logger
from metagpt.utils.exceptions import handle_exception
@ -65,7 +65,7 @@ class OutputParser:
@classmethod
def parse_blocks(cls, text: str):
# 首先根据"##"将文本分割成不同的block
blocks = text.split("##")
blocks = text.split(MARKDOWN_TITLE_PREFIX)
# 创建一个字典用于存储每个block的标题和内容
block_dict = {}

View file

@ -198,8 +198,9 @@ class FileRepository:
:type dependencies: List[str], optional
"""
await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies)
doc = await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies)
logger.debug(f"File Saved: {str(doc.filename)}")
return doc
async def save_pdf(self, doc: Document, with_suffix: str = ".md", dependencies: List[str] = None):
"""Save a Document instance as a PDF file.
@ -216,8 +217,9 @@ class FileRepository:
"""
m = json.loads(doc.content)
filename = Path(doc.filename).with_suffix(with_suffix) if with_suffix is not None else Path(doc.filename)
await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies)
doc = await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies)
logger.debug(f"File Saved: {str(filename)}")
return doc
async def delete(self, filename: Path | str):
"""Delete a file from the file repository.

305
metagpt/utils/report.py Normal file
View file

@ -0,0 +1,305 @@
import asyncio
import os
import typing
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Literal, Optional, Union
from urllib.parse import unquote, urlparse, urlunparse
from uuid import UUID, uuid4
from aiohttp import ClientSession, UnixConnector
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Page as SyncPage
from pydantic import BaseModel, Field, PrivateAttr
from metagpt.const import METAGPT_REPORTER_DEFAULT_URL
from metagpt.logs import create_llm_stream_queue, get_llm_stream_queue
if typing.TYPE_CHECKING:
from metagpt.roles.role import Role
try:
import requests_unixsocket as requests
except ImportError:
import requests
from contextvars import ContextVar
CURRENT_ROLE: ContextVar["Role"] = ContextVar("role")
class BlockType(str, Enum):
"""Enumeration for different types of blocks."""
TERMINAL = "Terminal"
TASK = "Task"
BROWSER = "Browser"
BROWSER_RT = "Browser-RT"
EDITOR = "Editor"
GALLERY = "Gallery"
NOTEBOOK = "Notebook"
DOCS = "Docs"
END_MARKER_NAME = "end_marker"
END_MARKER_VALUE = "\x18\x19\x1B\x18"
class ResourceReporter(BaseModel):
"""Base class for resource reporting."""
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
def report(self, value: Any, name: str):
"""Synchronously report resource observation data.
Args:
value: The data to report.
name: The type name of the data.
"""
return self._report(value, name)
async def async_report(self, value: Any, name: str):
"""Asynchronously report resource observation data.
Args:
value: The data to report.
name: The type name of the data.
"""
return await self._async_report(value, name)
@classmethod
def set_report_fn(cls, fn: Callable):
"""Set the synchronous report function.
Args:
fn: A callable function used for synchronous reporting. For example:
>>> def _report(self, value: Any, name: str):
... print(value, name)
"""
cls._report = fn
@classmethod
def set_async_report_fn(cls, fn: Callable):
"""Set the asynchronous report function.
Args:
fn: A callable function used for asynchronous reporting. For example:
```python
>>> async def _report(self, value: Any, name: str):
... print(value, name)
```
"""
cls._async_report = fn
def _report(self, value: Any, name: str):
if not self.callback_url:
return
data = self._format_data(value, name)
resp = requests.post(self.callback_url, json=data)
resp.raise_for_status()
return resp.text
async def _async_report(self, value: Any, name: str):
if not self.callback_url:
return
data = self._format_data(value, name)
url = self.callback_url
_result = urlparse(url)
sessiion_kwargs = {}
if _result.scheme.endswith("+unix"):
parsed_list = list(_result)
parsed_list[0] = parsed_list[0][:-5]
parsed_list[1] = "fake.org"
url = urlunparse(parsed_list)
sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc))
async with ClientSession(**sessiion_kwargs) as client:
async with client.post(url, json=data) as resp:
resp.raise_for_status()
return await resp.text()
def _format_data(self, value, name):
data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream"))
data["value"] = str(value) if isinstance(value, Path) else value
data["name"] = name
role = CURRENT_ROLE.get(None)
if role:
role_name = role.name
else:
role_name = os.environ.get("METAGPT_ROLE")
data["role"] = role_name
return data
def __enter__(self):
"""Enter the synchronous streaming callback context."""
self.is_chunk = True
return self
def __exit__(self, *args, **kwargs):
"""Exit the synchronous streaming callback context."""
self.report(None, END_MARKER_NAME)
self.is_chunk = False
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
return self
async def __aexit__(self, *args, **kwargs):
"""Exit the asynchronous streaming callback context."""
self.is_chunk = False
if self.enable_llm_stream:
self._llm_task.cancel()
self._llm_task = None
await self.async_report(None, END_MARKER_NAME)
async def _llm_stream_report(self, queue: asyncio.Queue):
while self.is_chunk:
await self.async_report(await queue.get(), "content")
async def wait_llm_stream_report(self):
"""Wait for the LLM stream report to complete."""
queue = get_llm_stream_queue()
while self._llm_task:
if queue.empty():
break
await asyncio.sleep(0.01)
class TerminalReporter(ResourceReporter):
"""Terminal output callback for streaming reporting of command and output.
The terminal has state, and an agent can open multiple terminals and input different commands into them.
To correctly display these states, each terminal should have its own unique ID, so in practice, each terminal
should instantiate its own TerminalReporter object.
"""
block: Literal[BlockType.TERMINAL] = BlockType.TERMINAL
def report(self, value: str, name: Literal["cmd", "output"]):
"""Report terminal command or output synchronously."""
return super().report(value, name)
async def async_report(self, value: str, name: Literal["cmd", "output"]):
"""Report terminal command or output asynchronously."""
return await super().async_report(value, name)
class BrowserReporter(ResourceReporter):
"""Browser output callback for streaming reporting of requested URL and page content.
The browser has state, so in practice, each browser should instantiate its own BrowserReporter object.
"""
block: Literal[BlockType.BROWSER] = BlockType.BROWSER
def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]):
"""Report browser URL or page content synchronously."""
if name == "page":
value = {"page_url": value.url, "title": value.title(), "screenshot": str(value.screenshot())}
return super().report(value, name)
async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]):
"""Report browser URL or page content asynchronously."""
if name == "page":
value = {"page_url": value.url, "title": await value.title(), "screenshot": str(await value.screenshot())}
return await super().async_report(value, name)
class ServerReporter(ResourceReporter):
"""Callback for server deployment reporting."""
block: Literal[BlockType.BROWSER_RT] = BlockType.BROWSER_RT
def report(self, value: str, name: Literal["local_url"] = "local_url"):
"""Report server deployment synchronously."""
return super().report(value, name)
async def async_report(self, value: str, name: Literal["local_url"] = "local_url"):
"""Report server deployment asynchronously."""
return await super().async_report(value, name)
class ObjectReporter(ResourceReporter):
"""Callback for reporting complete object resources."""
def report(self, value: dict, name: Literal["object"] = "object"):
"""Report object resource synchronously."""
return super().report(value, name)
async def async_report(self, value: dict, name: Literal["object"] = "object"):
"""Report object resource asynchronously."""
return await super().async_report(value, name)
class TaskReporter(ObjectReporter):
"""Reporter for object resources to Task Block."""
block: Literal[BlockType.TASK] = BlockType.TASK
class FileReporter(ResourceReporter):
"""File resource callback for reporting complete file paths.
There are two scenarios: if the file needs to be output in its entirety at once, use non-streaming callback;
if the file can be partially output for display first, use streaming callback.
"""
def report(self, value: Union[Path, dict, Any], name: Literal["path", "meta", "content"] = "path"):
"""Report file resource synchronously."""
return super().report(value, name)
async def async_report(self, value: Path, name: Literal["path", "meta", "content"] = "path"):
"""Report file resource asynchronously."""
return await super().async_report(value, name)
class NotebookReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.NOTEBOOK)."""
block: Literal[BlockType.NOTEBOOK] = BlockType.NOTEBOOK
class DocsReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.DOCS)."""
block: Literal[BlockType.DOCS] = BlockType.DOCS
class EditorReporter(FileReporter):
"""Equivalent to FileReporter(block=BlockType.Editor)."""
block: Literal[BlockType.EDITOR] = BlockType.EDITOR
class GalleryReporter(FileReporter):
"""Image resource callback for reporting complete file paths.
Since images need to be complete before display, each callback is a complete file path. However, the Gallery
needs to display the type of image and prompt, so if there is meta information, it should be reported in a
streaming manner.
"""
block: Literal[BlockType.GALLERY] = BlockType.GALLERY
def report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"):
"""Report image resource synchronously."""
return super().report(value, name)
async def async_report(self, value: Union[dict, Path], name: Literal["meta", "path"] = "path"):
"""Report image resource asynchronously."""
return await super().async_report(value, name)

View file

@ -247,14 +247,16 @@ def search_engine_mocker(aiohttp_mocker, curl_cffi_mocker, httplib2_mocker, sear
@pytest.fixture
def http_server():
async def handler(request):
return aiohttp.web.Response(
text="""<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">
<title>MetaGPT</title></head><body><h1>MetaGPT</h1></body></html>""",
content_type="text/html",
)
async def start(handler=None):
if handler is None:
async def handler(request):
return aiohttp.web.Response(
text="""<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8">
<title>MetaGPT</title></head><body><h1>MetaGPT</h1></body></html>""",
content_type="text/html",
)
async def start():
server = aiohttp.web.Server(handler)
runner = aiohttp.web.ServerRunner(server)
await runner.setup()

View file

@ -0,0 +1,182 @@
import ast
from contextlib import asynccontextmanager
import aiohttp.web
import pytest
from metagpt.logs import log_llm_stream
from metagpt.utils.report import (
END_MARKER_NAME,
BlockType,
BrowserReporter,
DocsReporter,
EditorReporter,
NotebookReporter,
ServerReporter,
TaskReporter,
TerminalReporter,
)
class MockFileLLM:
def __init__(self, data: str):
self.data = data
async def aask(self, *args, **kwargs) -> str:
for i in self.data.splitlines(keepends=True):
log_llm_stream(i)
log_llm_stream("\n")
return self.data
@asynccontextmanager
async def callback_server(http_server):
callback_data = []
async def handler(request):
callback_data.append(await request.json())
return aiohttp.web.json_response({})
server, url = await http_server(handler)
yield url, callback_data
await server.stop()
@pytest.mark.asyncio
async def test_terminal_report(http_server):
async with callback_server(http_server) as (url, callback_data):
async with TerminalReporter(callback_url=url) as reporter:
await reporter.async_report("ls -a", "cmd")
await reporter.async_report("main.py\n", "output")
await reporter.async_report("setup.py\n", "output")
assert all(BlockType.TERMINAL is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert "".join(i["value"] for i in callback_data if i["name"] != END_MARKER_NAME) == "ls -amain.py\nsetup.py\n"
@pytest.mark.asyncio
async def test_browser_report(http_server):
img = b"\x89PNG\r\n\x1a\n\x00\x00"
web_url = "https://docs.deepwisdom.ai"
class AsyncPage:
async def screenshot(self):
return img
async with callback_server(http_server) as (url, callback_data):
async with BrowserReporter(callback_url=url) as reporter:
await reporter.async_report(web_url, "url")
await reporter.async_report(AsyncPage(), "page")
assert all(BlockType.BROWSER is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert len(callback_data) == 3
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[0]["name"] == "url"
assert callback_data[0]["value"] == web_url
assert callback_data[1]["name"] == "page"
assert ast.literal_eval(callback_data[1]["value"]) == img
@pytest.mark.asyncio
async def test_server_reporter(http_server):
local_url = "http://127.0.0.1:8080/index.html"
async with callback_server(http_server) as (url, callback_data):
reporter = ServerReporter(callback_url=url)
await reporter.async_report(local_url)
assert all(BlockType.BROWSER_RT is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 1
assert callback_data[0]["name"] == "local_url"
assert callback_data[0]["value"] == local_url
assert not callback_data[0]["is_chunk"]
@pytest.mark.asyncio
async def test_task_reporter(http_server):
task = {"current_task_id": "", "tasks": []}
async with callback_server(http_server) as (url, callback_data):
reporter = TaskReporter(callback_url=url)
await reporter.async_report(task)
assert all(BlockType.TASK is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 1
assert callback_data[0]["name"] == "object"
assert callback_data[0]["value"] == task
@pytest.mark.asyncio
async def test_notebook_reporter(http_server):
code = {
"cell_type": "code",
"execution_count": None,
"id": "e1841c44",
"metadata": {},
"outputs": [],
"source": ["\n", "import time\n", "print('will sleep 1s.')\n", "time.sleep(1)\n", "print('end.')\n", ""],
}
output1 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]}
output2 = {"name": "stdout", "output_type": "stream", "text": ["will sleep 1s.\n"]}
code_path = "/data/main.ipynb"
async with callback_server(http_server) as (url, callback_data):
async with NotebookReporter(callback_url=url) as reporter:
await reporter.async_report(code, "content")
await reporter.async_report(output1, "content")
await reporter.async_report(output2, "content")
await reporter.async_report(code_path, "path")
assert all(BlockType.NOTEBOOK is BlockType(i["block"]) for i in callback_data)
assert len(callback_data) == 5
assert callback_data[-1]["name"] == END_MARKER_NAME
assert callback_data[-2]["name"] == "path"
assert callback_data[-2]["value"] == code_path
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
assert [i["value"] for i in callback_data if i["name"] == "content"] == [code, output1, output2]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("data", "file_path", "meta", "block", "report_cls"),
(
(
"## Language\n\nen_us\n\n## Programming Language\n\nPython\n\n## Original Requirements\n\nCreate a 2048 gam...",
"/data/prd.md",
{"type": "write_prd"},
BlockType.DOCS,
DocsReporter,
),
(
"#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\nprint('Hello World')\n",
"/data/main.py",
{"type": "write_code"},
BlockType.EDITOR,
EditorReporter,
),
),
ids=["test_docs_reporter", "test_editor_reporter"],
)
async def test_llm_stream_reporter(data, file_path, meta, block, report_cls, http_server):
async with callback_server(http_server) as (url, callback_data):
async with report_cls(callback_url=url, enable_llm_stream=True) as reporter:
await reporter.async_report(meta, "meta")
await MockFileLLM(data).aask("")
await reporter.wait_llm_stream_report()
await reporter.async_report(file_path, "path")
assert callback_data
assert all(block is BlockType(i["block"]) for i in callback_data)
assert all(i["uuid"] == callback_data[0]["uuid"] for i in callback_data[1:])
chunks, names = [], set()
for i in callback_data:
name = i["name"]
names.add(name)
if name == "meta":
assert i["value"] == meta
elif name == "path":
assert i["value"] == file_path
elif name == END_MARKER_NAME:
pass
elif name == "content":
chunks.append(i["value"])
else:
raise ValueError
assert "".join(chunks[:-1]) == data
assert names == {"meta", "path", "content", END_MARKER_NAME}