diff --git a/metagpt/actions/design_api.py b/metagpt/actions/design_api.py index 1a6178f5c..0525233fe 100644 --- a/metagpt/actions/design_api.py +++ b/metagpt/actions/design_api.py @@ -24,6 +24,7 @@ from metagpt.actions.design_api_an import ( ) from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO from metagpt.logs import logger +from metagpt.report import DocsReporter from metagpt.schema import Document, Documents, Message from metagpt.utils.mermaid import mermaid_to_file @@ -82,19 +83,22 @@ class WriteDesign(Action): async def _update_system_design(self, filename) -> Document: prd = await self.repo.docs.prd.get(filename) old_system_design_doc = await self.repo.docs.system_design.get(filename) - if not old_system_design_doc: - system_design = await self._new_system_design(context=prd.content) - doc = await self.repo.docs.system_design.save( - filename=filename, - content=system_design.instruct_content.model_dump_json(), - dependencies={prd.root_relative_path}, - ) - else: - doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc) - await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path}) - await self._save_data_api_design(doc) - await self._save_seq_flow(doc) - await self.repo.resources.system_design.save_pdf(doc=doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "design"}, "meta") + if not old_system_design_doc: + system_design = await self._new_system_design(context=prd.content) + doc = await self.repo.docs.system_design.save( + filename=filename, + content=system_design.instruct_content.model_dump_json(), + dependencies={prd.root_relative_path}, + ) + else: + doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc) + await self.repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path}) + await self._save_data_api_design(doc) + await self._save_seq_flow(doc) + md = await self.repo.resources.system_design.save_pdf(doc=doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return doc async def _save_data_api_design(self, design_doc): diff --git a/metagpt/actions/di/execute_nb_code.py b/metagpt/actions/di/execute_nb_code.py index c43d8bb57..ca0efc5d0 100644 --- a/metagpt/actions/di/execute_nb_code.py +++ b/metagpt/actions/di/execute_nb_code.py @@ -26,7 +26,8 @@ from rich.syntax import Syntax from metagpt.actions import Action from metagpt.const import DEFAULT_WORKSPACE_ROOT -from metagpt.logs import ToolLogItem, log_llm_stream, log_tool_output, logger +from metagpt.logs import logger +from metagpt.report import NotebookReporter INSTALL_KEEPLEN = 500 @@ -34,12 +35,16 @@ INSTALL_KEEPLEN = 500 class RealtimeOutputNotebookClient(NotebookClient): """Realtime output of Notebook execution.""" + def __init__(self, *args, notebook_reporter=None, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.notebook_reporter = notebook_reporter or NotebookReporter() + async def _async_poll_output_msg(self, parent_msg_id: str, cell: NotebookNode, cell_index: int) -> None: """Implement a feature to enable sending messages.""" assert self.kc is not None while True: msg = await ensure_async(self.kc.iopub_channel.get_msg(timeout=None)) - self._send_msg(msg) + await self._send_msg(msg) if msg["parent_header"].get("msg_id") == parent_msg_id: try: @@ -48,12 +53,12 @@ class RealtimeOutputNotebookClient(NotebookClient): except CellExecutionComplete: return - def _send_msg(self, msg: dict): + async def _send_msg(self, msg: dict): msg_type = msg.get("header", {}).get("msg_type") if msg_type not in ["stream", "error", "execute_result"]: return - log_llm_stream(output_from_msg(msg)) + await self.notebook_reporter.async_report(output_from_msg(msg), "content") class ExecuteNbCode(Action): @@ -64,18 +69,20 @@ class ExecuteNbCode(Action): console: Console interaction: str timeout: int = 600 - enable_realtime_output: bool = False - def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600, enable_realtime_output=False): + def __init__(self, nb=nbformat.v4.new_notebook(), timeout=600): super().__init__( nb=nb, timeout=timeout, console=Console(), interaction=("ipython" if self.is_ipython() else "terminal"), - enable_realtime_output=enable_realtime_output, ) - self.nb_client = self._resolve_nb_client()( - nb, timeout=timeout, resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}} + self.reporter = NotebookReporter() + self.nb_client = RealtimeOutputNotebookClient( + nb, + timeout=timeout, + resources={"metadata": {"path": DEFAULT_WORKSPACE_ROOT}}, + notebook_reporter=self.reporter, ) async def build(self): @@ -201,7 +208,7 @@ class ExecuteNbCode(Action): """set timeout for run code. returns the success or failure of the cell execution, and an optional error message. """ - self._try_realtime_output(cell) + await self.reporter.async_report(cell, "content") try: await self.nb_client.async_execute_cell(cell, cell_index) @@ -224,42 +231,36 @@ class ExecuteNbCode(Action): """ self._display(code, language) - if language == "python": - # add code to the notebook - self.add_code_cell(code=code) + async with self.reporter: + if language == "python": + # add code to the notebook + self.add_code_cell(code=code) - # build code executor - await self.build() + # build code executor + await self.build() - # run code - cell_index = len(self.nb.cells) - 1 - success, outputs = await self.run_cell(self.nb.cells[-1], cell_index) + # run code + cell_index = len(self.nb.cells) - 1 + success, outputs = await self.run_cell(self.nb.cells[-1], cell_index) - if "!pip" in code: - success = False - outputs = outputs[-INSTALL_KEEPLEN:] + if "!pip" in code: + success = False + outputs = outputs[-INSTALL_KEEPLEN:] + + elif language == "markdown": + # add markdown content to markdown cell in a notebook. + self.add_markdown_cell(code) + # return True, beacuse there is no execution failure for markdown cell. + outputs, success = code, True + else: + raise ValueError(f"Only support for language: python, markdown, but got {language}, ") file_path = DEFAULT_WORKSPACE_ROOT / "code.ipynb" nbformat.write(self.nb, file_path) - log_tool_output(ToolLogItem(name="file_path", value=file_path), tool_name="ExecuteNbCode") + await self.reporter.async_report(file_path, "path") return outputs, success - elif language == "markdown": - # add markdown content to markdown cell in a notebook. - self.add_markdown_cell(code) - # return True, beacuse there is no execution failure for markdown cell. - return code, True - else: - raise ValueError(f"Only support for language: python, markdown, but got {language}, ") - - def _resolve_nb_client(self) -> NotebookClient: - return RealtimeOutputNotebookClient if self.enable_realtime_output else NotebookClient - - def _try_realtime_output(self, cell: NotebookNode): - if self.enable_realtime_output: - log_llm_stream(cell) - def remove_escape_and_color_codes(input_str: str): # 使用正则表达式去除jupyter notebook输出结果中的转义字符和颜色代码 diff --git a/metagpt/actions/project_management.py b/metagpt/actions/project_management.py index b52616e37..fa1c0c42f 100644 --- a/metagpt/actions/project_management.py +++ b/metagpt/actions/project_management.py @@ -18,6 +18,7 @@ from metagpt.actions.action_output import ActionOutput from metagpt.actions.project_management_an import PM_NODE, REFINED_PM_NODE from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME from metagpt.logs import logger +from metagpt.report import DocsReporter from metagpt.schema import Document, Documents NEW_REQ_TEMPLATE = """ @@ -59,18 +60,21 @@ class WriteTasks(Action): async def _update_tasks(self, filename): system_design_doc = await self.repo.docs.system_design.get(filename) task_doc = await self.repo.docs.task.get(filename) - if task_doc: - task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc) - await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path}) - else: - rsp = await self._run_new_tasks(context=system_design_doc.content) - task_doc = await self.repo.docs.task.save( - filename=filename, - content=rsp.instruct_content.model_dump_json(), - dependencies={system_design_doc.root_relative_path}, - ) - await self._update_requirements(task_doc) - await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "task"}, "meta") + if task_doc: + task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc) + await self.repo.docs.task.save_doc(doc=task_doc, dependencies={system_design_doc.root_relative_path}) + else: + rsp = await self._run_new_tasks(context=system_design_doc.content) + task_doc = await self.repo.docs.task.save( + filename=filename, + content=rsp.instruct_content.model_dump_json(), + dependencies={system_design_doc.root_relative_path}, + ) + await self._update_requirements(task_doc) + md = await self.repo.resources.api_spec_and_task.save_pdf(doc=task_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return task_doc async def _run_new_tasks(self, context): diff --git a/metagpt/actions/write_code.py b/metagpt/actions/write_code.py index feb15657d..a44fb22a0 100644 --- a/metagpt/actions/write_code.py +++ b/metagpt/actions/write_code.py @@ -25,6 +25,7 @@ from metagpt.actions.project_management_an import REFINED_TASK_LIST, TASK_LIST from metagpt.actions.write_code_plan_and_change_an import REFINED_TEMPLATE from metagpt.const import BUGFIX_FILENAME, REQUIREMENT_FILENAME from metagpt.logs import logger +from metagpt.report import EditorReporter from metagpt.schema import CodingContext, Document, RunCodeResult from metagpt.utils.common import CodeParser from metagpt.utils.project_repo import ProjectRepo @@ -139,12 +140,15 @@ class WriteCode(Action): summary_log=summary_doc.content if summary_doc else "", ) logger.info(f"Writing {coding_context.filename}..") - code = await self.write_code(prompt) - if not coding_context.code_doc: - # avoid root_path pydantic ValidationError if use WriteCode alone - root_path = self.context.src_workspace if self.context.src_workspace else "" - coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path)) - coding_context.code_doc.content = code + async with EditorReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"filename": coding_context.filename}, "meta") + code = await self.write_code(prompt) + if not coding_context.code_doc: + # avoid root_path pydantic ValidationError if use WriteCode alone + root_path = self.context.src_workspace if self.context.src_workspace else "" + coding_context.code_doc = Document(filename=coding_context.filename, root_path=str(root_path)) + coding_context.code_doc.content = code + await reporter.async_report(self.repo.workdir / coding_context.code_doc.root_relative_path, "path") return coding_context @staticmethod diff --git a/metagpt/actions/write_prd.py b/metagpt/actions/write_prd.py index 220cfa52e..968f1147d 100644 --- a/metagpt/actions/write_prd.py +++ b/metagpt/actions/write_prd.py @@ -33,6 +33,7 @@ from metagpt.const import ( REQUIREMENT_FILENAME, ) from metagpt.logs import logger +from metagpt.report import DocsReporter from metagpt.schema import BugFixContext, Document, Documents, Message from metagpt.utils.common import CodeParser from metagpt.utils.file_repository import FileRepository @@ -102,19 +103,22 @@ class WritePRD(Action): async def _handle_new_requirement(self, req: Document) -> ActionOutput: """handle new requirement""" - project_name = self.project_name - context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name) - exclude = [PROJECT_NAME.key] if project_name else [] - node = await WRITE_PRD_NODE.fill( - context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema - ) # schema=schema - await self._rename_workspace(node) - new_prd_doc = await self.repo.docs.prd.save( - filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json() - ) - await self._save_competitive_analysis(new_prd_doc) - await self.repo.resources.prd.save_pdf(doc=new_prd_doc) - return Documents.from_iterable(documents=[new_prd_doc]).to_action_output() + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "prd"}, "meta") + project_name = self.project_name + context = CONTEXT_TEMPLATE.format(requirements=req, project_name=project_name) + exclude = [PROJECT_NAME.key] if project_name else [] + node = await WRITE_PRD_NODE.fill( + context=context, llm=self.llm, exclude=exclude, schema=self.prompt_schema + ) # schema=schema + await self._rename_workspace(node) + new_prd_doc = await self.repo.docs.prd.save( + filename=FileRepository.new_filename() + ".json", content=node.instruct_content.model_dump_json() + ) + await self._save_competitive_analysis(new_prd_doc) + md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") + return Documents.from_iterable(documents=[new_prd_doc]).to_action_output() async def _handle_requirement_update(self, req: Document, related_docs: list[Document]) -> ActionOutput: # ... requirement update logic ... @@ -148,10 +152,13 @@ class WritePRD(Action): return related_doc async def _update_prd(self, req: Document, prd_doc: Document) -> Document: - new_prd_doc: Document = await self._merge(req, prd_doc) - await self.repo.docs.prd.save_doc(doc=new_prd_doc) - await self._save_competitive_analysis(new_prd_doc) - await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + async with DocsReporter(enable_llm_stream=True) as reporter: + await reporter.async_report({"type": "prd"}, "meta") + new_prd_doc: Document = await self._merge(req, prd_doc) + await self.repo.docs.prd.save_doc(doc=new_prd_doc) + await self._save_competitive_analysis(new_prd_doc) + md = await self.repo.resources.prd.save_pdf(doc=new_prd_doc) + await reporter.async_report(self.repo.workdir / md.root_relative_path, "path") return new_prd_doc async def _save_competitive_analysis(self, prd_doc: Document): diff --git a/metagpt/report.py b/metagpt/report.py index ee3c1697b..5bc0ec03d 100644 --- a/metagpt/report.py +++ b/metagpt/report.py @@ -2,7 +2,7 @@ import asyncio from enum import Enum from pathlib import Path from typing import Any, Callable, Literal, Optional, Union -from urllib.parse import unquote, urlparse +from urllib.parse import unquote, urlparse, urlunparse from uuid import UUID, uuid4 from aiohttp import ClientSession, UnixConnector @@ -98,7 +98,7 @@ class ResourceReporter(BaseModel): data = self._format_data(value, name) resp = requests.post(self.callback_url, json=data) resp.raise_for_status() - return resp.text() + return resp.text async def _async_report(self, value: Any, name: str): if not self.callback_url: @@ -108,8 +108,11 @@ class ResourceReporter(BaseModel): url = self.callback_url _result = urlparse(url) sessiion_kwargs = {} - if "unix" in _result.scheme: - url = str(_result._replace(scheme="http", netloc="fake.org")) + if _result.scheme.endswith("+unix"): + parsed_list = list(_result) + parsed_list[0] = parsed_list[0][:-5] + parsed_list[1] = "fake.org" + url = urlunparse(parsed_list) sessiion_kwargs["connector"] = UnixConnector(path=unquote(_result.netloc)) async with ClientSession(**sessiion_kwargs) as client: @@ -119,7 +122,7 @@ class ResourceReporter(BaseModel): def _format_data(self, value, name): data = self.model_dump(mode="json", exclude=("callback_url", "llm_stream")) - data["value"] = value + data["value"] = str(value) if isinstance(value, Path) else value data["name"] = name return data @@ -192,15 +195,13 @@ class BrowserReporter(ResourceReporter): def report(self, value: Union[str, SyncPage], name: Literal["url", "page"]): """Report browser URL or page content synchronously.""" if name == "page": - value = value.screenshot() - value = str(value) + value = {"page_url": value.url, "title": value.title(), "screenshot": str(value.screenshot())} return super().report(value, name) async def async_report(self, value: Union[str, AsyncPage], name: Literal["url", "page"]): """Report browser URL or page content asynchronously.""" if name == "page": - value = await value.screenshot() - value = str(value) + value = {"page_url": value.url, "title": await value.title(), "screenshot": str(await value.screenshot())} return await super().async_report(value, name) diff --git a/metagpt/schema.py b/metagpt/schema.py index b24d18d09..dd81427d2 100644 --- a/metagpt/schema.py +++ b/metagpt/schema.py @@ -46,8 +46,9 @@ from metagpt.const import ( SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO, ) -from metagpt.logs import ToolLogItem, log_tool_output, logger +from metagpt.logs import logger from metagpt.repo_parser import DotClassInfo +from metagpt.report import TaskReporter from metagpt.utils.common import any_to_str, any_to_str_set, import_class from metagpt.utils.exceptions import handle_exception from metagpt.utils.serialize import ( @@ -510,14 +511,7 @@ class Plan(BaseModel): current_task_id = task.task_id break self.current_task_id = current_task_id - - log_tool_output( - [ - ToolLogItem(type="object", name="tasks", value=self.tasks), - ToolLogItem(type="object", name="current_task_id", value=self.current_task_id), - ], - tool_name="Plan", - ) + TaskReporter().report({"tasks": self.tasks, "current_task_id": current_task_id}) @property def current_task(self) -> Task: diff --git a/metagpt/tools/libs/browser.py b/metagpt/tools/libs/browser.py index b6a5b7cbf..60129c6e3 100644 --- a/metagpt/tools/libs/browser.py +++ b/metagpt/tools/libs/browser.py @@ -1,7 +1,7 @@ from playwright.async_api import async_playwright from metagpt.const import DEFAULT_WORKSPACE_ROOT -from metagpt.logs import ToolLogItem, log_tool_output_async +from metagpt.report import BrowserReporter from metagpt.tools.tool_registry import register_tool @@ -20,6 +20,7 @@ class Browser: self.pages = {} self.current_page_url = None self.current_page = None + self.reporter = BrowserReporter() async def start(self): """Starts Playwright and launches a browser""" @@ -34,21 +35,19 @@ class Browser: async def open_new_page(self, url: str): """open a new page in the browser and view the page""" - page = await self.browser.new_page() - await page.goto(url) - self.pages[url] = page - await self._set_current_page(page, url) - await log_tool_output_async( - ToolLogItem(type="object", name="open_new_page", value=self.current_page), tool_name="Browser" - ) + async with self.reporter as reporter: + page = await self.browser.new_page() + await reporter.async_report(url, "url") + await page.goto(url) + self.pages[url] = page + await self._set_current_page(page, url) + await reporter.async_report(page, "page") async def switch_page(self, url: str): """switch to an opened page in the browser and view the page""" if url in self.pages: await self._set_current_page(self.pages[url], url) - await log_tool_output_async( - ToolLogItem(type="object", name="switch_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") else: print(f"Page not found: {url}") @@ -110,9 +109,8 @@ class Browser: index = len(search_results) - 1 element = search_results[index]["element_obj"] await element.scroll_into_view_if_needed() - await log_tool_output_async( - ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") + print(f"Successfully scrolled to the {index}-th search result") print(await self._view()) @@ -152,9 +150,8 @@ class Browser: async def scroll_current_page(self, offset: int = 500): """scroll the current page by offset pixels, negative value means scrolling up, will print out observed content after scrolling""" await self.current_page.evaluate(f"window.scrollBy(0, {offset})") - await log_tool_output_async( - ToolLogItem(type="object", name="scroll_page", value=self.current_page), tool_name="Browser" - ) + await self.reporter.async_report(self.current_page, "page") + print(f"Scrolled current page by {offset} pixels.") print(await self._view()) diff --git a/metagpt/tools/libs/deployer.py b/metagpt/tools/libs/deployer.py index 3c9c2f7e5..ca6b312ca 100644 --- a/metagpt/tools/libs/deployer.py +++ b/metagpt/tools/libs/deployer.py @@ -1,4 +1,4 @@ -from metagpt.logs import ToolLogItem, log_tool_output +from metagpt.report import ServerReporter from metagpt.tools.tool_registry import register_tool @@ -8,4 +8,4 @@ class Deployer: """Deploy a local service to public. Used only for final deployment, you should NOT use it for development and testing.""" def deploy_to_public(self, local_url: str): - log_tool_output(ToolLogItem(name="local_url", value=local_url), tool_name="Deployer") + ServerReporter().report(local_url, "local_url") diff --git a/metagpt/tools/libs/file_manager.py b/metagpt/tools/libs/file_manager.py index 75c173d5e..ba06b764a 100644 --- a/metagpt/tools/libs/file_manager.py +++ b/metagpt/tools/libs/file_manager.py @@ -4,7 +4,7 @@ import subprocess from pydantic import BaseModel, Field -from metagpt.logs import ToolLogItem, log_tool_output +from metagpt.report import EditorReporter from metagpt.tools.tool_registry import register_tool @@ -23,17 +23,20 @@ class FileBlock(BaseModel): class FileManager: """A tool for reading, understanding, writing, and editing files""" + def __init__(self) -> None: + self.resource = EditorReporter() + def write(self, path: str, content: str): """Write the whole content to a file.""" with open(path, "w") as f: f.write(content) - log_tool_output(ToolLogItem(name="write_file_path", value=path), tool_name="FileManager") + self.resource.report(path, "path") def read(self, path: str) -> str: """Read the whole content of a file.""" with open(path, "r") as f: + self.resource.report(path, "path") return f.read() - log_tool_output(ToolLogItem(name="read_file_path", value=path), tool_name="FileManager") def search_content(self, symbol: str, root_path: str = "", window: int = 20) -> FileBlock: """ @@ -78,10 +81,7 @@ class FileManager: symbol=symbol, symbol_line=i + 1, ) - log_tool_output( - ToolLogItem(type="object", name="file_block_searched", value=result), - tool_name="FileManager", - ) + self.resource.report(result.file_path, "path") return result return None @@ -124,9 +124,7 @@ class FileManager: block_start_line=start_line, block_end_line=-1 if end_line < start_line else start_line + new_block_content.count("\n"), ) - log_tool_output( - ToolLogItem(type="object", name="file_block_written", value=new_file_block), tool_name="FileManager" - ) + self.resource.report(new_file_block.file_path, "path") return f"Content written successfully to {file_path}" diff --git a/metagpt/tools/libs/terminal.py b/metagpt/tools/libs/terminal.py index dc395e89b..28a6e227a 100644 --- a/metagpt/tools/libs/terminal.py +++ b/metagpt/tools/libs/terminal.py @@ -2,7 +2,7 @@ import subprocess import threading from queue import Queue -from metagpt.logs import TOOL_LOG_END_MARKER, ToolLogItem, log_tool_output +from metagpt.report import END_MARKER_VALUE, TerminalReporter from metagpt.tools.tool_registry import register_tool @@ -28,9 +28,10 @@ class Terminal: stderr=subprocess.STDOUT, text=True, bufsize=1, # Line buffered - executable="/bin/bash" + executable="/bin/bash", ) self.stdout_queue = Queue() + self.observer = TerminalReporter() def run_command(self, cmd: str, daemon=False) -> str: """ @@ -59,7 +60,7 @@ class Terminal: # Send the command self.process.stdin.write(cmd + self.command_terminator) self.process.stdin.write( - f'echo "{TOOL_LOG_END_MARKER.value}"' + self.command_terminator # write EOF + f'echo "{END_MARKER_VALUE}"' + self.command_terminator # write EOF ) # Unique marker to signal command end self.process.stdin.flush() if daemon: @@ -93,28 +94,26 @@ class Terminal: return self.run_command(cmd, daemon=daemon) def _read_and_process_output(self, cmd): - cmd_output = [] - log_tool_output( - output=ToolLogItem(name="cmd", value=cmd + self.command_terminator), tool_name="Terminal" - ) # log the command + with self.observer as observer: + cmd_output = [] + observer.report(cmd + self.command_terminator, "cmd") + # report the command - # Read the output until the unique marker is found - while True: - line = self.process.stdout.readline() - ix = line.rfind(TOOL_LOG_END_MARKER.value) - if ix >= 0: - line = line[0:ix] - if line: - log_tool_output( - output=ToolLogItem(name="output", value=line), tool_name="Terminal" - ) # log stdout in real-time - cmd_output.append(line) - log_tool_output(TOOL_LOG_END_MARKER) - break - # log stdout in real-time - log_tool_output(output=ToolLogItem(name="output", value=line), tool_name="Terminal") - cmd_output.append(line) - self.stdout_queue.put(line) + # Read the output until the unique marker is found + while True: + line = self.process.stdout.readline() + ix = line.rfind(END_MARKER_VALUE) + if ix >= 0: + line = line[0:ix] + if line: + observer.report(line, "output") + # report stdout in real-time + cmd_output.append(line) + break + # log stdout in real-time + observer.report(line, "output") + cmd_output.append(line) + self.stdout_queue.put(line) return "".join(cmd_output) diff --git a/metagpt/utils/file_repository.py b/metagpt/utils/file_repository.py index d19f2b705..dd6c0709f 100644 --- a/metagpt/utils/file_repository.py +++ b/metagpt/utils/file_repository.py @@ -198,8 +198,9 @@ class FileRepository: :type dependencies: List[str], optional """ - await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies) + doc = await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies) logger.debug(f"File Saved: {str(doc.filename)}") + return doc async def save_pdf(self, doc: Document, with_suffix: str = ".md", dependencies: List[str] = None): """Save a Document instance as a PDF file. @@ -216,8 +217,9 @@ class FileRepository: """ m = json.loads(doc.content) filename = Path(doc.filename).with_suffix(with_suffix) if with_suffix is not None else Path(doc.filename) - await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies) + doc = await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies) logger.debug(f"File Saved: {str(filename)}") + return doc async def delete(self, filename: Path | str): """Delete a file from the file repository.