Merge branch 'feature-stream-parse' into 'mgx_ops'

Feature stream parse

See merge request pub/MetaGPT!174
This commit is contained in:
林义章 2024-06-18 06:56:29 +00:00
commit 70b69717f9
7 changed files with 20 additions and 21 deletions

View file

@ -31,11 +31,11 @@ class MGXEnv(Environment):
if user_defined_recipient:
# human user's direct chat message to a certain role
if self.get_role(user_defined_recipient).is_idle:
# User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved.
# If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual.
self.direct_chat_roles.add(user_defined_recipient)
for role_name in message.send_to:
if self.get_role(role_name).is_idle:
# User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved.
# If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual.
self.direct_chat_roles.add(role_name)
self._publish_message(message)
# # bypass team leader, team leader only needs to know but not to react (commented out because TL doesn't understand the message well in actual experiments)

View file

@ -82,7 +82,7 @@ class DataAnalyst(DataInterpreter):
available_commands=prepare_command_prompt(self.available_commands),
)
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
async with ThoughtReporter():
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(context)
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
self.rc.memory.add(Message(content=rsp, role="assistant"))

View file

@ -74,7 +74,7 @@ class DataInterpreter(Role):
return True
prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context)
async with ThoughtReporter():
async with ThoughtReporter(enable_llm_stream=True):
rsp = await self.llm.aask(prompt)
rsp_dict = json.loads(CodeParser.parse_code(text=rsp))
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))

View file

@ -129,7 +129,7 @@ class RoleZero(Role):
)
context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)])
# print(*context, sep="\n" + "*" * 5 + "\n")
async with ThoughtReporter():
async with ThoughtReporter(enable_llm_stream=True):
self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg)
self.rc.memory.add(AIMessage(content=self.command_rsp))

View file

@ -1,5 +1,4 @@
import os
import re
import shutil
import subprocess
@ -27,15 +26,16 @@ class Editor:
def write(self, path: str, content: str):
"""Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
if len(re.findall(r"\\n", content)) >= 5:
# A very raw rule to correct the content: Many \\n suggests all new line characters are mistaken as \\n whereas the correct one should be \n
if "\n" not in content and "\\n" in content:
# A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider
# replacing them with '\n' to potentially correct mistaken representations of newline characters.
content = content.replace("\\n", "\n")
directory = os.path.dirname(path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(path, "w", encoding="utf-8") as f:
f.write(content)
self.resource.report(path, "path")
# self.resource.report(path, "path")
def read(self, path: str) -> FileBlock:
"""Read the whole content of a file."""

View file

@ -275,7 +275,7 @@ class CodeParser:
def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str:
if block:
text = cls.parse_block(block, text)
pattern = rf"```{lang}.*?\s+(.*?)```"
pattern = rf"```{lang}.*?\s+(.*?)\n```"
match = re.search(pattern, text, re.DOTALL)
if match:
code = match.group(1)

View file

@ -51,7 +51,6 @@ class ResourceReporter(BaseModel):
block: BlockType = Field(description="The type of block that is reporting the resource")
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
@ -153,17 +152,14 @@ class ResourceReporter(BaseModel):
def __enter__(self):
"""Enter the synchronous streaming callback context."""
self.is_chunk = True
return self
def __exit__(self, *args, **kwargs):
"""Exit the synchronous streaming callback context."""
self.report(None, END_MARKER_NAME)
self.is_chunk = False
async def __aenter__(self):
"""Enter the asynchronous streaming callback context."""
self.is_chunk = True
if self.enable_llm_stream:
queue = create_llm_stream_queue()
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
@ -171,15 +167,18 @@ class ResourceReporter(BaseModel):
async def __aexit__(self, *args, **kwargs):
"""Exit the asynchronous streaming callback context."""
self.is_chunk = False
if self.enable_llm_stream:
self._llm_task.cancel()
await get_llm_stream_queue().put(None)
await self._llm_task
self._llm_task = None
await self.async_report(None, END_MARKER_NAME)
async def _llm_stream_report(self, queue: asyncio.Queue):
while self.is_chunk:
await self.async_report(await queue.get(), "content")
while True:
data = await queue.get()
if data is None:
return
await self.async_report(data, "content")
async def wait_llm_stream_report(self):
"""Wait for the LLM stream report to complete."""