diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py index e489c9d98..ff985a9ef 100644 --- a/metagpt/roles/di/data_analyst.py +++ b/metagpt/roles/di/data_analyst.py @@ -82,7 +82,7 @@ class DataAnalyst(DataInterpreter): available_commands=prepare_command_prompt(self.available_commands), ) context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(context) self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp)) self.rc.memory.add(Message(content=rsp, role="assistant")) diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index bdfc0e294..f90a928df 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -74,7 +74,7 @@ class DataInterpreter(Role): return True prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context) - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(prompt) rsp_dict = json.loads(CodeParser.parse_code(text=rsp)) self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant")) diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index 44e658ed4..4f18060ee 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -129,7 +129,7 @@ class RoleZero(Role): ) context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)]) # print(*context, sep="\n" + "*" * 5 + "\n") - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg) self.rc.memory.add(AIMessage(content=self.command_rsp)) diff --git a/metagpt/tools/libs/editor.py b/metagpt/tools/libs/editor.py index eba1a1eac..d145fdabe 100644 --- a/metagpt/tools/libs/editor.py +++ b/metagpt/tools/libs/editor.py @@ -35,7 +35,7 @@ class Editor: os.makedirs(directory) with open(path, "w", encoding="utf-8") as f: f.write(content) - self.resource.report(path, "path") + # self.resource.report(path, "path") def read(self, path: str) -> FileBlock: """Read the whole content of a file.""" diff --git a/metagpt/utils/report.py b/metagpt/utils/report.py index 2d72af111..ed40e5dde 100644 --- a/metagpt/utils/report.py +++ b/metagpt/utils/report.py @@ -51,7 +51,6 @@ class ResourceReporter(BaseModel): block: BlockType = Field(description="The type of block that is reporting the resource") uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") - is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") _llm_task: Optional[asyncio.Task] = PrivateAttr(None) @@ -153,17 +152,14 @@ class ResourceReporter(BaseModel): def __enter__(self): """Enter the synchronous streaming callback context.""" - self.is_chunk = True return self def __exit__(self, *args, **kwargs): """Exit the synchronous streaming callback context.""" self.report(None, END_MARKER_NAME) - self.is_chunk = False async def __aenter__(self): """Enter the asynchronous streaming callback context.""" - self.is_chunk = True if self.enable_llm_stream: queue = create_llm_stream_queue() self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) @@ -171,15 +167,18 @@ class ResourceReporter(BaseModel): async def __aexit__(self, *args, **kwargs): """Exit the asynchronous streaming callback context.""" - self.is_chunk = False if self.enable_llm_stream: - self._llm_task.cancel() + await get_llm_stream_queue().put(None) + await self._llm_task self._llm_task = None await self.async_report(None, END_MARKER_NAME) async def _llm_stream_report(self, queue: asyncio.Queue): - while self.is_chunk: - await self.async_report(await queue.get(), "content") + while True: + data = await queue.get() + if data is None: + return + await self.async_report(data, "content") async def wait_llm_stream_report(self): """Wait for the LLM stream report to complete."""