Merge branch 'feat-stream' into feature-reporter

This commit is contained in:
shenchucheng 2024-04-20 20:44:45 +08:00
commit fe3010e965
7 changed files with 59 additions and 19 deletions

View file

@ -17,7 +17,7 @@ from pydantic import BaseModel, Field, create_model, model_validator
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action_outcls_registry import register_action_outcls
from metagpt.const import USE_CONFIG_TIMEOUT
from metagpt.const import MARKDOWN_TITLE_PREFIX, USE_CONFIG_TIMEOUT
from metagpt.llm import BaseLLM
from metagpt.logs import logger
from metagpt.provider.postprocess.llm_output_postprocess import llm_output_postprocess
@ -113,7 +113,7 @@ Follow format example's {prompt_schema} format, generate output and make sure it
"""
def dict_to_markdown(d, prefix="- ", kv_sep="\n", postfix="\n"):
def dict_to_markdown(d, prefix=MARKDOWN_TITLE_PREFIX, kv_sep="\n", postfix="\n"):
markdown_str = ""
for key, value in d.items():
markdown_str += f"{prefix}{key}{kv_sep}{value}{postfix}"

View file

@ -70,12 +70,12 @@ class WriteDesign(Action):
return ActionOutput(content=changed_files.model_dump_json(), instruct_content=changed_files)
async def _new_system_design(self, context):
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm)
node = await DESIGN_API_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema)
return node
async def _merge(self, prd_doc, system_design_doc):
context = NEW_REQ_TEMPLATE.format(old_design=system_design_doc.content, context=prd_doc.content)
node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm)
node = await REFINED_DESIGN_NODE.fill(context=context, llm=self.llm, schema=self.prompt_schema)
system_design_doc.content = node.instruct_content.model_dump_json()
return system_design_doc

View file

@ -13,9 +13,10 @@ from typing import Literal, Tuple
import nbformat
from nbclient import NotebookClient
from nbclient.exceptions import CellTimeoutError, DeadKernelError
from nbclient.exceptions import CellExecutionComplete, CellTimeoutError, DeadKernelError
from nbclient.util import ensure_async
from nbformat import NotebookNode
from nbformat.v4 import new_code_cell, new_markdown_cell, new_output
from nbformat.v4 import new_code_cell, new_markdown_cell, new_output, output_from_msg
from rich.box import MINIMAL
from rich.console import Console, Group
from rich.live import Live
@ -25,31 +26,56 @@ from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import ToolLogItem, log_tool_output, logger
from metagpt.logs import ToolLogItem, log_llm_stream, log_tool_output, logger
INSTALL_KEEPLEN = 500
class RealtimeOutputNotebookClient(NotebookClient):
"""Realtime output of Notebook execution."""
async def _async_poll_output_msg(self, parent_msg_id: str, cell: NotebookNode, cell_index: int) -> None:
"""Implement a feature to enable sending messages."""
assert self.kc is not None
while True:
msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None))
self._send_msg(msg)
if msg["parent_header"].get("msg_id") == parent_msg_id:
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
return
def _send_msg(self, msg: dict):
msg_type = msg.get("header", {}).get("msg_type")
if msg_type not in ["stream", "error", "execute_result"]:
return
log_llm_stream(output_from_msg(msg))
class ExecuteNbCode(Action):
"""execute notebook code block, return result to llm, and display it."""
nb: NotebookNode
nb_client: NotebookClient
nb_client: NotebookClient = None
console: Console
interaction: str
timeout: int = 600
enable_realtime_output: bool = False
def __init__(
self,
nb=nbformat.v4.new_notebook(),
timeout=600,
):
def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600, enable_realtime_output=False):
super().__init__(
nb=nb,
nb_client=NotebookClient(nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}),
timeout=timeout,
console=Console(),
interaction=("ipython" if self.is_ipython() else "terminal"),
enable_realtime_output=enable_realtime_output,
)
self.nb_client = self._resolve_nb_client()(
nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}
)
async def build(self):
@ -175,6 +201,8 @@ class ExecuteNbCode(Action):
"""set timeout for run code.
returns the success or failure of the cell execution, and an optional error message.
"""
self._try_realtime_output(cell)
try:
await self.nb_client.async_execute_cell(cell, cell_index)
return self.parse_outputs(self.nb.cells[-1].outputs)
@ -225,6 +253,13 @@ class ExecuteNbCode(Action):
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
def _resolve_nb_client(self) -> NotebookClient:
return RealtimeOutputNotebookClient if self.enable_realtime_output else NotebookClient
def _try_realtime_output(self, cell: NotebookNode):
if self.enable_realtime_output:
log_llm_stream(cell)
def remove_escape_and_color_codes(input_str: str):
# 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码

View file

@ -105,7 +105,9 @@ class WritePRD(Action):
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
node = await WRITE_PRD_NODE.fill(context=context, llm=self.llm, exclude=exclude) # schema=schema
node = await WRITE_PRD_NODE.fill(
context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema
) # schema=schema
await self._rename_workspace(node)
new_prd_doc = await self.repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json()

View file

@ -140,4 +140,8 @@ LLM_API_TIMEOUT = 300
# Assistant alias
ASSISTANT_ALIAS = "response"
# Markdown
MARKDOWN_TITLE_PREFIX = "## "
# Reporter
METAGPT_REPORTER_DEFAULT_URL = os.environ.get("METAGPT_REPORTER_URL", "")

View file

@ -7,7 +7,6 @@ from pathlib import Path
import typer
from metagpt.const import CONFIG_ROOT
from metagpt.utils.project_repo import ProjectRepo
app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False)
@ -25,7 +24,7 @@ def generate_repo(
reqa_file="",
max_auto_summarize_code=0,
recover_path=None,
) -> ProjectRepo:
):
"""Run the startup logic. Can be called from CLI or other Python scripts."""
from metagpt.config2 import config
from metagpt.context import Context

View file

@ -37,7 +37,7 @@ from PIL import Image
from pydantic_core import to_jsonable_python
from tenacity import RetryCallState, RetryError, _utils
from metagpt.const import MESSAGE_ROUTE_TO_ALL
from metagpt.const import MARKDOWN_TITLE_PREFIX, MESSAGE_ROUTE_TO_ALL
from metagpt.logs import logger
from metagpt.utils.exceptions import handle_exception
@ -65,7 +65,7 @@ class OutputParser:
@classmethod
def parse_blocks(cls, text: str):
# 首先根据"##"将文本分割成不同的block
blocks = text.split("##")
blocks = text.split(MARKDOWN_TITLE_PREFIX)
# 创建一个字典用于存储每个block的标题和内容
block_dict = {}