resource event report

This commit is contained in:
shenchucheng 2024-04-25 20:14:18 +08:00
parent 63eb85ad61
commit 021060d526
12 changed files with 169 additions and 158 deletions

View file

@ -24,6 +24,7 @@ from metagpt.actions.design_api_an import (
)
from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO
from metagpt.logs import logger
from metagpt.report import DocsReporter
from metagpt.schema import Document, Documents, Message
from metagpt.utils.mermaid import mermaid_to_file
@ -82,19 +83,22 @@ class WriteDesign(Action):
async def _update_system_design(self, filename) -> Document:
prd = await self.repo.docs.prd.get(filename)
old_system_design_doc = await self.repo.docs.system_design.get(filename)
if not old_system_design_doc:
system_design = await self._new_system_design(context=prd.content)
doc = await self.repo.docs.system_design.save(
filename=filename,
content=system_design.instruct_content.model_dump_json(),
dependencies={prd.root_relative_path},
)
else:
doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)
await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})
await self._save_data_api_design(doc)
await self._save_seq_flow(doc)
await self.repo.resources.system_design.save_pdf(doc=doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "design"}, "meta")
if not old_system_design_doc:
system_design = await self._new_system_design(context=prd.content)
doc = await self.repo.docs.system_design.save(
filename=filename,
content=system_design.instruct_content.model_dump_json(),
dependencies={prd.root_relative_path},
)
else:
doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)
await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})
await self._save_data_api_design(doc)
await self._save_seq_flow(doc)
md = await self.repo.resources.system_design.save_pdf(doc=doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return doc
async def _save_data_api_design(self, design_doc):

View file

@ -26,7 +26,8 @@ from rich.syntax import Syntax
from metagpt.actions import Action
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.logs import ToolLogItem, log_llm_stream, log_tool_output, logger
from metagpt.logs import logger
from metagpt.report import NotebookReporter
INSTALL_KEEPLEN = 500
@ -34,12 +35,16 @@ INSTALL_KEEPLEN = 500
class RealtimeOutputNotebookClient(NotebookClient):
"""Realtime output of Notebook execution."""
def __init__(self, *args, notebook_reporter=None, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.notebook_reporter = notebook_reporter or NotebookReporter()
async def _async_poll_output_msg(self, parent_msg_id: str, cell: NotebookNode, cell_index: int) -> None:
"""Implement a feature to enable sending messages."""
assert self.kc is not None
while True:
msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None))
self._send_msg(msg)
await self._send_msg(msg)
if msg["parent_header"].get("msg_id") == parent_msg_id:
try:
@ -48,12 +53,12 @@ class RealtimeOutputNotebookClient(NotebookClient):
except CellExecutionComplete:
return
def _send_msg(self, msg: dict):
async def _send_msg(self, msg: dict):
msg_type = msg.get("header", {}).get("msg_type")
if msg_type not in ["stream", "error", "execute_result"]:
return
log_llm_stream(output_from_msg(msg))
await self.notebook_reporter.async_report(output_from_msg(msg), "content")
class ExecuteNbCode(Action):
@ -64,18 +69,20 @@ class ExecuteNbCode(Action):
console: Console
interaction: str
timeout: int = 600
enable_realtime_output: bool = False
def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600, enable_realtime_output=False):
def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600):
super().__init__(
nb=nb,
timeout=timeout,
console=Console(),
interaction=("ipython" if self.is_ipython() else "terminal"),
enable_realtime_output=enable_realtime_output,
)
self.nb_client = self._resolve_nb_client()(
nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}
self.reporter = NotebookReporter()
self.nb_client = RealtimeOutputNotebookClient(
nb,
timeout=timeout,
resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}},
notebook_reporter=self.reporter,
)
async def build(self):
@ -201,7 +208,7 @@ class ExecuteNbCode(Action):
"""set timeout for run code.
returns the success or failure of the cell execution, and an optional error message.
"""
self._try_realtime_output(cell)
await self.reporter.async_report(cell, "content")
try:
await self.nb_client.async_execute_cell(cell, cell_index)
@ -224,42 +231,36 @@ class ExecuteNbCode(Action):
"""
self._display(code, language)
if language == "python":
# add code to the notebook
self.add_code_cell(code=code)
async with self.reporter:
if language == "python":
# add code to the notebook
self.add_code_cell(code=code)
# build code executor
await self.build()
# build code executor
await self.build()
# run code
cell_index = len(self.nb.cells) - 1
success, outputs = await self.run_cell(self.nb.cells[-1], cell_index)
# run code
cell_index = len(self.nb.cells) - 1
success, outputs = await self.run_cell(self.nb.cells[-1], cell_index)
if "!pip" in code:
success = False
outputs = outputs[-INSTALL_KEEPLEN:]
if "!pip" in code:
success = False
outputs = outputs[-INSTALL_KEEPLEN:]
elif language == "markdown":
# add markdown content to markdown cell in a notebook.
self.add_markdown_cell(code)
# return True, beacuse there is no execution failure for markdown cell.
outputs, success = code, True
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
file_path = DEFAULT_WORKSPACE_ROOT / "code.ipynb"
nbformat.write(self.nb, file_path)
log_tool_output(ToolLogItem(name="file_path", value=file_path), tool_name="ExecuteNbCode")
await self.reporter.async_report(file_path, "path")
return outputs, success
elif language == "markdown":
# add markdown content to markdown cell in a notebook.
self.add_markdown_cell(code)
# return True, beacuse there is no execution failure for markdown cell.
return code, True
else:
raise ValueError(f"Only support for language: python, markdown, but got {language}, ")
def _resolve_nb_client(self) -> NotebookClient:
return RealtimeOutputNotebookClient if self.enable_realtime_output else NotebookClient
def _try_realtime_output(self, cell: NotebookNode):
if self.enable_realtime_output:
log_llm_stream(cell)
def remove_escape_and_color_codes(input_str: str):
# 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码

View file

@ -18,6 +18,7 @@ from metagpt.actions.action_output import ActionOutput
from metagpt.actions.project_management_an import PM_NODE, REFINED_PM_NODE
from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME
from metagpt.logs import logger
from metagpt.report import DocsReporter
from metagpt.schema import Document, Documents
NEW_REQ_TEMPLATE = """
@ -59,18 +60,21 @@ class WriteTasks(Action):
async def _update_tasks(self, filename):
system_design_doc = await self.repo.docs.system_design.get(filename)
task_doc = await self.repo.docs.task.get(filename)
if task_doc:
task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)
await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path})
else:
rsp = await self._run_new_tasks(context=system_design_doc.content)
task_doc = await self.repo.docs.task.save(
filename=filename,
content=rsp.instruct_content.model_dump_json(),
dependencies={system_design_doc.root_relative_path},
)
await self._update_requirements(task_doc)
await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "task"}, "meta")
if task_doc:
task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)
await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path})
else:
rsp = await self._run_new_tasks(context=system_design_doc.content)
task_doc = await self.repo.docs.task.save(
filename=filename,
content=rsp.instruct_content.model_dump_json(),
dependencies={system_design_doc.root_relative_path},
)
await self._update_requirements(task_doc)
md = await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return task_doc
async def _run_new_tasks(self, context):

View file

@ -25,6 +25,7 @@ from metagpt.actions.project_management_an import REFINED_TASK_LIST, TASK_LIST
from metagpt.actions.write_code_plan_and_change_an import REFINED_TEMPLATE
from metagpt.const import BUGFIX_FILENAME, REQUIREMENT_FILENAME
from metagpt.logs import logger
from metagpt.report import EditorReporter
from metagpt.schema import CodingContext, Document, RunCodeResult
from metagpt.utils.common import CodeParser
from metagpt.utils.project_repo import ProjectRepo
@ -139,12 +140,15 @@ class WriteCode(Action):
summary_log=summary_doc.content if summary_doc else "",
)
logger.info(f"Writing {coding_context.filename}..")
code = await self.write_code(prompt)
if not coding_context.code_doc:
# avoid root_path pydantic ValidationError if use WriteCode alone
root_path = self.context.src_workspace if self.context.src_workspace else ""
coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path))
coding_context.code_doc.content = code
async with EditorReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"filename": coding_context.filename}, "meta")
code = await self.write_code(prompt)
if not coding_context.code_doc:
# avoid root_path pydantic ValidationError if use WriteCode alone
root_path = self.context.src_workspace if self.context.src_workspace else ""
coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path))
coding_context.code_doc.content = code
await reporter.async_report(self.repo.workdir / coding_context.code_doc.root_relative_path, "path")
return coding_context
@staticmethod

View file

@ -33,6 +33,7 @@ from metagpt.const import (
REQUIREMENT_FILENAME,
)
from metagpt.logs import logger
from metagpt.report import DocsReporter
from metagpt.schema import BugFixContext, Document, Documents, Message
from metagpt.utils.common import CodeParser
from metagpt.utils.file_repository import FileRepository
@ -102,19 +103,22 @@ class WritePRD(Action):
async def _handle_new_requirement(self, req: Document) -> ActionOutput:
"""handle new requirement"""
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
node = await WRITE_PRD_NODE.fill(
context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema
) # schema=schema
await self._rename_workspace(node)
new_prd_doc = await self.repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json()
)
await self._save_competitive_analysis(new_prd_doc)
await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
return Documents.from_iterable(documents=[new_prd_doc]).to_action_output()
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "prd"}, "meta")
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
node = await WRITE_PRD_NODE.fill(
context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema
) # schema=schema
await self._rename_workspace(node)
new_prd_doc = await self.repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json()
)
await self._save_competitive_analysis(new_prd_doc)
md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return Documents.from_iterable(documents=[new_prd_doc]).to_action_output()
async def _handle_requirement_update(self, req: Document, related_docs: list[Document]) -> ActionOutput:
# ... requirement update logic ...
@ -148,10 +152,13 @@ class WritePRD(Action):
return related_doc
async def _update_prd(self, req: Document, prd_doc: Document) -> Document:
new_prd_doc: Document = await self._merge(req, prd_doc)
await self.repo.docs.prd.save_doc(doc=new_prd_doc)
await self._save_competitive_analysis(new_prd_doc)
await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
async with DocsReporter(enable_llm_stream=True) as reporter:
await reporter.async_report({"type": "prd"}, "meta")
new_prd_doc: Document = await self._merge(req, prd_doc)
await self.repo.docs.prd.save_doc(doc=new_prd_doc)
await self._save_competitive_analysis(new_prd_doc)
md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc)
await reporter.async_report(self.repo.workdir / md.root_relative_path, "path")
return new_prd_doc
async def _save_competitive_analysis(self, prd_doc: Document):