diff --git a/metagpt/environment/mgx/mgx_env.py b/metagpt/environment/mgx/mgx_env.py index ab515e070..33160cfd8 100644 --- a/metagpt/environment/mgx/mgx_env.py +++ b/metagpt/environment/mgx/mgx_env.py @@ -31,11 +31,11 @@ class MGXEnv(Environment): if user_defined_recipient: # human user's direct chat message to a certain role - - if self.get_role(user_defined_recipient).is_idle: - # User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved. - # If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual. - self.direct_chat_roles.add(user_defined_recipient) + for role_name in message.send_to: + if self.get_role(role_name).is_idle: + # User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved. + # If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual. + self.direct_chat_roles.add(role_name) self._publish_message(message) # # bypass team leader, team leader only needs to know but not to react (commented out because TL doesn't understand the message well in actual experiments) diff --git a/metagpt/roles/di/data_analyst.py b/metagpt/roles/di/data_analyst.py index e489c9d98..ff985a9ef 100644 --- a/metagpt/roles/di/data_analyst.py +++ b/metagpt/roles/di/data_analyst.py @@ -82,7 +82,7 @@ class DataAnalyst(DataInterpreter): available_commands=prepare_command_prompt(self.available_commands), ) context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")]) - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(context) self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp)) self.rc.memory.add(Message(content=rsp, role="assistant")) diff --git a/metagpt/roles/di/data_interpreter.py b/metagpt/roles/di/data_interpreter.py index bdfc0e294..f90a928df 100644 --- a/metagpt/roles/di/data_interpreter.py +++ b/metagpt/roles/di/data_interpreter.py @@ -74,7 +74,7 @@ class DataInterpreter(Role): return True prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context) - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): rsp = await self.llm.aask(prompt) rsp_dict = json.loads(CodeParser.parse_code(text=rsp)) self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant")) diff --git a/metagpt/roles/di/role_zero.py b/metagpt/roles/di/role_zero.py index 44e658ed4..4f18060ee 100644 --- a/metagpt/roles/di/role_zero.py +++ b/metagpt/roles/di/role_zero.py @@ -129,7 +129,7 @@ class RoleZero(Role): ) context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)]) # print(*context, sep="\n" + "*" * 5 + "\n") - async with ThoughtReporter(): + async with ThoughtReporter(enable_llm_stream=True): self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg) self.rc.memory.add(AIMessage(content=self.command_rsp)) diff --git a/metagpt/tools/libs/editor.py b/metagpt/tools/libs/editor.py index eba1a1eac..c9f02c4f1 100644 --- a/metagpt/tools/libs/editor.py +++ b/metagpt/tools/libs/editor.py @@ -1,5 +1,4 @@ import os -import re import shutil import subprocess @@ -27,15 +26,16 @@ class Editor: def write(self, path: str, content: str): """Write the whole content to a file. When used, make sure content arg contains the full content of the file.""" - if len(re.findall(r"\\n", content)) >= 5: - # A very raw rule to correct the content: Many \\n suggests all new line characters are mistaken as \\n whereas the correct one should be \n + if "\n" not in content and "\\n" in content: + # A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider + # replacing them with '\n' to potentially correct mistaken representations of newline characters. content = content.replace("\\n", "\n") directory = os.path.dirname(path) if directory and not os.path.exists(directory): os.makedirs(directory) with open(path, "w", encoding="utf-8") as f: f.write(content) - self.resource.report(path, "path") + # self.resource.report(path, "path") def read(self, path: str) -> FileBlock: """Read the whole content of a file.""" diff --git a/metagpt/utils/common.py b/metagpt/utils/common.py index b58ac99da..72dc6ab94 100644 --- a/metagpt/utils/common.py +++ b/metagpt/utils/common.py @@ -275,7 +275,7 @@ class CodeParser: def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str: if block: text = cls.parse_block(block, text) - pattern = rf"```{lang}.*?\s+(.*?)```" + pattern = rf"```{lang}.*?\s+(.*?)\n```" match = re.search(pattern, text, re.DOTALL) if match: code = match.group(1) diff --git a/metagpt/utils/report.py b/metagpt/utils/report.py index 2d72af111..ed40e5dde 100644 --- a/metagpt/utils/report.py +++ b/metagpt/utils/report.py @@ -51,7 +51,6 @@ class ResourceReporter(BaseModel): block: BlockType = Field(description="The type of block that is reporting the resource") uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource") - is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream") enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting") callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent") _llm_task: Optional[asyncio.Task] = PrivateAttr(None) @@ -153,17 +152,14 @@ class ResourceReporter(BaseModel): def __enter__(self): """Enter the synchronous streaming callback context.""" - self.is_chunk = True return self def __exit__(self, *args, **kwargs): """Exit the synchronous streaming callback context.""" self.report(None, END_MARKER_NAME) - self.is_chunk = False async def __aenter__(self): """Enter the asynchronous streaming callback context.""" - self.is_chunk = True if self.enable_llm_stream: queue = create_llm_stream_queue() self._llm_task = asyncio.create_task(self._llm_stream_report(queue)) @@ -171,15 +167,18 @@ class ResourceReporter(BaseModel): async def __aexit__(self, *args, **kwargs): """Exit the asynchronous streaming callback context.""" - self.is_chunk = False if self.enable_llm_stream: - self._llm_task.cancel() + await get_llm_stream_queue().put(None) + await self._llm_task self._llm_task = None await self.async_report(None, END_MARKER_NAME) async def _llm_stream_report(self, queue: asyncio.Queue): - while self.is_chunk: - await self.async_report(await queue.get(), "content") + while True: + data = await queue.get() + if data is None: + return + await self.async_report(data, "content") async def wait_llm_stream_report(self): """Wait for the LLM stream report to complete."""