mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-08 15:05:17 +02:00
Merge branch 'mgx_ops' into feat-opt-engineer2
This commit is contained in:
commit
cc500edb37
11 changed files with 204 additions and 173 deletions
|
|
@ -159,7 +159,7 @@ class Environment(ExtEnv):
|
|||
"""增加一个在当前环境的角色
|
||||
Add a role in the current environment
|
||||
"""
|
||||
self.roles[role.profile] = role
|
||||
self.roles[role.name] = role
|
||||
role.set_env(self)
|
||||
role.context = self.context
|
||||
|
||||
|
|
@ -168,7 +168,7 @@ class Environment(ExtEnv):
|
|||
Add a batch of characters in the current environment
|
||||
"""
|
||||
for role in roles:
|
||||
self.roles[role.profile] = role
|
||||
self.roles[role.name] = role
|
||||
|
||||
for role in roles: # setup system message with roles
|
||||
role.context = self.context
|
||||
|
|
|
|||
|
|
@ -17,20 +17,33 @@ from metagpt.utils.common import any_to_str, any_to_str_set
|
|||
class MGXEnv(Environment):
|
||||
"""MGX Environment"""
|
||||
|
||||
# Before enabling TL to fully take over the routing, all software company roles need to be able to handle TL messages, which requires restructuring.
|
||||
allow_bypass_team_leader: bool = True
|
||||
# If True, fixed software sop bypassing TL is allowed, otherwise, TL will fully take over the routing
|
||||
allow_bypass_team_leader: bool = False
|
||||
|
||||
direct_chat_roles: set[str] = set() # record direct chat: @role_name
|
||||
|
||||
def _publish_message(self, message: Message, peekable: bool = True) -> bool:
|
||||
return super().publish_message(message, peekable)
|
||||
|
||||
def publish_message(self, message: Message, user_defined_recipient: str = "", publicer: str = "") -> bool:
|
||||
"""let the team leader take over message publishing"""
|
||||
tl = self.get_role("Team Leader")
|
||||
tl = self.get_role("Tim") # TeamLeader's name is Tim
|
||||
|
||||
if user_defined_recipient:
|
||||
# human user's direct chat message to a certain role
|
||||
for role_name in message.send_to:
|
||||
if self.get_role(role_name).is_idle:
|
||||
# User starts a new direct chat with a certain role, expecting a direct chat response from the role; Other roles including TL should not be involved.
|
||||
# If the role is not idle, it means the user helps the role with its current work, in this case, we handle the role's response message as usual.
|
||||
self.direct_chat_roles.add(role_name)
|
||||
|
||||
self._publish_message(message)
|
||||
# bypass team leader, team leader only needs to know but not to react
|
||||
tl.rc.memory.add(self.move_message_info_to_content(message))
|
||||
# # bypass team leader, team leader only needs to know but not to react (commented out because TL doesn't understand the message well in actual experiments)
|
||||
# tl.rc.memory.add(self.move_message_info_to_content(message))
|
||||
|
||||
elif message.sent_from in self.direct_chat_roles:
|
||||
# direct chat response from a certain role to human user, team leader and other roles in the env should not be involved, no need to publish
|
||||
self.direct_chat_roles.remove(message.sent_from)
|
||||
|
||||
elif (
|
||||
self.allow_bypass_team_leader
|
||||
|
|
@ -106,3 +119,6 @@ class MGXEnv(Environment):
|
|||
sent_from = converted_msg.metadata[AGENT] if AGENT in converted_msg.metadata else converted_msg.sent_from
|
||||
converted_msg.content = f"from {sent_from} to {converted_msg.send_to}: {converted_msg.content}"
|
||||
return converted_msg
|
||||
|
||||
def __repr__(self):
|
||||
return "MGXEnv()"
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ class Task(BaseModel):
|
|||
|
||||
# Available Commands
|
||||
{available_commands}
|
||||
Special Command: Use {{"command_name": "pass"}} to do nothing and {{"command_name": "end"}} to indicate completion of all requirements and the end of actions.
|
||||
Special Command: Use {{"command_name": "end"}} to do nothing or indicate completion of all requirements and the end of actions.
|
||||
|
||||
# Current Plan
|
||||
{plan_status}
|
||||
|
|
|
|||
|
|
@ -30,5 +30,6 @@ FINISH_CURRENT_TASK_CMD = """
|
|||
"command_name": "Plan.finish_current_task",
|
||||
"args": {{}}
|
||||
}
|
||||
]
|
||||
```
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ class DataAnalyst(DataInterpreter):
|
|||
# Command.PASS,
|
||||
]
|
||||
commands: list[dict] = [] # issued commands to be executed
|
||||
user_requirement: str = ""
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_plan_and_tool(self) -> "DataInterpreter":
|
||||
|
|
@ -68,9 +69,6 @@ class DataAnalyst(DataInterpreter):
|
|||
self.user_requirement = self.get_memories()[-1].content
|
||||
self.planner.plan.goal = self.user_requirement
|
||||
example = KeywordExpRetriever().retrieve(self.user_requirement)
|
||||
else:
|
||||
self.working_memory.add_batch(self.rc.news)
|
||||
# TODO: implement experience retrieval in multi-round setting
|
||||
|
||||
plan_status = self.planner.plan.model_dump(include=["goal", "tasks"])
|
||||
# for task in plan_status["tasks"]:
|
||||
|
|
@ -82,10 +80,11 @@ class DataAnalyst(DataInterpreter):
|
|||
available_commands=prepare_command_prompt(self.available_commands),
|
||||
)
|
||||
context = self.llm.format_msg(self.working_memory.get() + [Message(content=prompt, role="user")])
|
||||
async with ThoughtReporter():
|
||||
# print(*context, sep="\n" + "*" * 5 + "\n")
|
||||
async with ThoughtReporter(enable_llm_stream=True):
|
||||
rsp = await self.llm.aask(context)
|
||||
self.commands = json.loads(CodeParser.parse_code(block=None, text=rsp))
|
||||
self.rc.memory.add(Message(content=rsp, role="assistant"))
|
||||
self.rc.working_memory.add(Message(content=rsp, role="assistant"))
|
||||
|
||||
await run_commands(self, self.commands, self.rc.working_memory)
|
||||
|
||||
|
|
@ -118,7 +117,11 @@ class DataAnalyst(DataInterpreter):
|
|||
rsp = Message(content="No actions taken yet", cause_by=Action) # will be overwritten after Role _act
|
||||
while actions_taken < self.rc.max_react_loop:
|
||||
# NOTE: Diff 2: Keep observing within _react, news will go into memory, allowing adapting to new info
|
||||
# add news from self._observe, the one called in self.run, consider removing when switching from working_memory to memory
|
||||
self.working_memory.add_batch(self.rc.news)
|
||||
await self._observe()
|
||||
# add news from this self._observe, we need twice because _observe rewrites rc.news
|
||||
self.working_memory.add_batch(self.rc.news)
|
||||
|
||||
# think
|
||||
has_todo = await self._think()
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ class DataInterpreter(Role):
|
|||
return True
|
||||
|
||||
prompt = REACT_THINK_PROMPT.format(user_requirement=self.user_requirement, context=context)
|
||||
async with ThoughtReporter():
|
||||
async with ThoughtReporter(enable_llm_stream=True):
|
||||
rsp = await self.llm.aask(prompt)
|
||||
rsp_dict = json.loads(CodeParser.parse_code(text=rsp))
|
||||
self.working_memory.add(Message(content=rsp_dict["thoughts"], role="assistant"))
|
||||
|
|
|
|||
|
|
@ -53,7 +53,6 @@ class RoleZero(Role):
|
|||
experience_retriever: ExpRetriever = DummyExpRetriever()
|
||||
|
||||
# Others
|
||||
user_requirement: str = ""
|
||||
command_rsp: str = "" # the raw string containing the commands
|
||||
commands: list[dict] = [] # commands to be executed
|
||||
memory_k: int = 20 # number of memories (messages) to use as historical context
|
||||
|
|
@ -106,8 +105,7 @@ class RoleZero(Role):
|
|||
return False
|
||||
|
||||
if not self.planner.plan.goal:
|
||||
self.user_requirement = self.get_memories()[-1].content
|
||||
self.planner.plan.goal = self.user_requirement
|
||||
self.planner.plan.goal = self.get_memories()[-1].content
|
||||
|
||||
### 1. Experience ###
|
||||
example = self._retrieve_experience()
|
||||
|
|
@ -129,7 +127,7 @@ class RoleZero(Role):
|
|||
)
|
||||
context = self.llm.format_msg(self.rc.memory.get(self.memory_k) + [UserMessage(content=prompt)])
|
||||
# print(*context, sep="\n" + "*" * 5 + "\n")
|
||||
async with ThoughtReporter():
|
||||
async with ThoughtReporter(enable_llm_stream=True):
|
||||
self.command_rsp = await self.llm.aask(context, system_msgs=self.system_msg)
|
||||
self.rc.memory.add(AIMessage(content=self.command_rsp))
|
||||
|
||||
|
|
|
|||
|
|
@ -17,152 +17,165 @@ class DummyExpRetriever(ExpRetriever):
|
|||
return ""
|
||||
|
||||
|
||||
TL_EXAMPLE = """
|
||||
## example 1
|
||||
User Requirement: Create a cli snake game using Python.
|
||||
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "1",
|
||||
"dependent_task_ids": [],
|
||||
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.",
|
||||
"assignee": "Alice"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "2",
|
||||
"dependent_task_ids": ["1"],
|
||||
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
|
||||
"assignee": "Bob"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "3",
|
||||
"dependent_task_ids": ["2"],
|
||||
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
|
||||
"assignee": "Eve"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "4",
|
||||
"dependent_task_ids": ["3"],
|
||||
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
|
||||
"assignee": "Alex"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "5",
|
||||
"dependent_task_ids": ["4"],
|
||||
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
|
||||
"assignee": "Edward"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Create a cli snake game using Python",
|
||||
"send_to": "Alice"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly",
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "end"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 2
|
||||
User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
|
||||
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "1",
|
||||
"dependent_task_ids": [],
|
||||
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
|
||||
"assignee": "David"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
|
||||
"send_to": "David"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.",
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "end"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 3
|
||||
Conversation History:
|
||||
[
|
||||
...,
|
||||
{'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
|
||||
]
|
||||
Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.finish_current_task",
|
||||
"args": {}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
|
||||
"send_to": "Bob"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.",
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "end"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 4
|
||||
User Question: how does the project go?
|
||||
Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "The team is currently working on ... We have completed ...",
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "end"
|
||||
}
|
||||
]
|
||||
```
|
||||
"""
|
||||
|
||||
|
||||
class SimpleExpRetriever(ExpRetriever):
|
||||
"""A simple experience retriever that returns manually crafted examples."""
|
||||
|
||||
EXAMPLE: str = """
|
||||
## example 1
|
||||
User Requirement: Create a cli snake game using Python.
|
||||
Explanation: The requirement is about software development. Assign each tasks to a different team member based on their expertise. When publishing message to Product Manager, we copy original user requirement directly to ensure no information loss.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "1",
|
||||
"dependent_task_ids": [],
|
||||
"instruction": "Create a product requirement document (PRD) outlining the features, user interface, and user experience of the CLI python snake game.",
|
||||
"assignee": "Alice"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "2",
|
||||
"dependent_task_ids": ["1"],
|
||||
"instruction": "Design the software architecture for the CLI snake game, including the choice of programming language, libraries, and data flow.",
|
||||
"assignee": "Bob"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "3",
|
||||
"dependent_task_ids": ["2"],
|
||||
"instruction": "Break down the architecture into manageable tasks, identify task dependencies, and prepare a detailed task list for implementation.",
|
||||
"assignee": "Eve"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "4",
|
||||
"dependent_task_ids": ["3"],
|
||||
"instruction": "Implement the core game logic for the CLI snake game, including snake movement, food generation, and score tracking.",
|
||||
"assignee": "Alex"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "5",
|
||||
"dependent_task_ids": ["4"],
|
||||
"instruction": "Write comprehensive tests for the game logic and user interface to ensure functionality and reliability.",
|
||||
"assignee": "Edward"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Create a cli snake game using Python",
|
||||
"send_to": "Alice"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "I have assigned the tasks to the team members. Alice will create the PRD, Bob will design the software architecture, Eve will break down the architecture into tasks, Alex will implement the core game logic, and Edward will write comprehensive tests. The team will work on the project accordingly",
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 2
|
||||
User Requirement: Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.
|
||||
Explanation: DON'T decompose requirement if it is a DATA-RELATED task, assign a single task directly to Data Analyst David. He will manage the decomposition and implementation.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.append_task",
|
||||
"args": {
|
||||
"task_id": "1",
|
||||
"dependent_task_ids": [],
|
||||
"instruction": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
|
||||
"assignee": "David"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Run data analysis on sklearn Wine recognition dataset, include a plot, and train a model to predict wine class (20% as validation), and show validation accuracy.",
|
||||
"send_to": "David"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "I have assigned the task to David. He will break down the task further by himself and starts solving it.",
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 3
|
||||
Conversation History:
|
||||
[
|
||||
...,
|
||||
{'role': 'assistant', 'content': 'from Alice(Product Manager) to {'Bob'}: {'docs': {'20240424153821.json': {'root_path': 'docs/prd', 'filename': '20240424153821.json', 'content': '{"Language":"en_us","Programming Language":"Python","Original Requirements":"create a cli snake game","Project Name":"snake_game","Product Goals":["Develop an intuitive and addictive snake game",...], ...}}}}},
|
||||
]
|
||||
Explanation: You received a message from Alice, the Product Manager, that she has completed the PRD, use Plan.finish_current_task to mark her task as finished and moves the plan to the next task. Based on plan status, next task is for Bob (Architect), publish a message asking him to start. The message content should contain important path info.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "Plan.finish_current_task",
|
||||
"args": {}
|
||||
},
|
||||
{
|
||||
"command_name": "TeamLeader.publish_message",
|
||||
"args": {
|
||||
"content": "Please design the software architecture for the snake game based on the PRD created by Alice. The PRD is at 'docs/prd/20240424153821.json'. Include the choice of programming language, libraries, and data flow, etc.",
|
||||
"send_to": "Bob"
|
||||
}
|
||||
},
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "Alice has completed the PRD. I have marked her task as finished and sent the PRD to Bob. Bob will work on the software architecture.",
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## example 4
|
||||
User Question: how does the project go?
|
||||
Explanation: The user is asking for a general update on the project status. Give a straight answer about the current task the team is working on and provide a summary of the completed tasks.
|
||||
```json
|
||||
[
|
||||
{
|
||||
"command_name": "RoleZero.reply_to_human",
|
||||
"args": {
|
||||
"content": "The team is currently working on ... We have completed ...",
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
"""
|
||||
|
||||
def retrieve(self, context: str = "") -> str:
|
||||
return self.EXAMPLE
|
||||
return TL_EXAMPLE
|
||||
|
||||
|
||||
class KeywordExpRetriever(ExpRetriever):
|
||||
|
|
@ -213,6 +226,7 @@ Explanation: Launching a service requires Terminal tool with daemon mode, write
|
|||
"assignee": "David"
|
||||
}
|
||||
},
|
||||
]
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
|
|
@ -27,15 +26,16 @@ class Editor:
|
|||
|
||||
def write(self, path: str, content: str):
|
||||
"""Write the whole content to a file. When used, make sure content arg contains the full content of the file."""
|
||||
if len(re.findall(r"\\n", content)) >= 5:
|
||||
# A very raw rule to correct the content: Many \\n suggests all new line characters are mistaken as \\n whereas the correct one should be \n
|
||||
if "\n" not in content and "\\n" in content:
|
||||
# A very raw rule to correct the content: If 'content' lacks actual newlines ('\n') but includes '\\n', consider
|
||||
# replacing them with '\n' to potentially correct mistaken representations of newline characters.
|
||||
content = content.replace("\\n", "\n")
|
||||
directory = os.path.dirname(path)
|
||||
if directory and not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
self.resource.report(path, "path")
|
||||
# self.resource.report(path, "path")
|
||||
|
||||
def read(self, path: str) -> FileBlock:
|
||||
"""Read the whole content of a file."""
|
||||
|
|
|
|||
|
|
@ -275,7 +275,7 @@ class CodeParser:
|
|||
def parse_code(cls, text: str, lang: str = "", block: Optional[str] = None) -> str:
|
||||
if block:
|
||||
text = cls.parse_block(block, text)
|
||||
pattern = rf"```{lang}.*?\s+(.*?)```"
|
||||
pattern = rf"```{lang}.*?\s+(.*?)\n```"
|
||||
match = re.search(pattern, text, re.DOTALL)
|
||||
if match:
|
||||
code = match.group(1)
|
||||
|
|
|
|||
|
|
@ -51,7 +51,6 @@ class ResourceReporter(BaseModel):
|
|||
|
||||
block: BlockType = Field(description="The type of block that is reporting the resource")
|
||||
uuid: UUID = Field(default_factory=uuid4, description="The unique identifier for the resource")
|
||||
is_chunk: bool = Field(False, description="Indicates whether the report is a chunk of a stream")
|
||||
enable_llm_stream: bool = Field(False, description="Indicates whether to connect to an LLM stream for reporting")
|
||||
callback_url: str = Field(METAGPT_REPORTER_DEFAULT_URL, description="The URL to which the report should be sent")
|
||||
_llm_task: Optional[asyncio.Task] = PrivateAttr(None)
|
||||
|
|
@ -153,17 +152,14 @@ class ResourceReporter(BaseModel):
|
|||
|
||||
def __enter__(self):
|
||||
"""Enter the synchronous streaming callback context."""
|
||||
self.is_chunk = True
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
"""Exit the synchronous streaming callback context."""
|
||||
self.report(None, END_MARKER_NAME)
|
||||
self.is_chunk = False
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Enter the asynchronous streaming callback context."""
|
||||
self.is_chunk = True
|
||||
if self.enable_llm_stream:
|
||||
queue = create_llm_stream_queue()
|
||||
self._llm_task = asyncio.create_task(self._llm_stream_report(queue))
|
||||
|
|
@ -171,15 +167,18 @@ class ResourceReporter(BaseModel):
|
|||
|
||||
async def __aexit__(self, *args, **kwargs):
|
||||
"""Exit the asynchronous streaming callback context."""
|
||||
self.is_chunk = False
|
||||
if self.enable_llm_stream:
|
||||
self._llm_task.cancel()
|
||||
await get_llm_stream_queue().put(None)
|
||||
await self._llm_task
|
||||
self._llm_task = None
|
||||
await self.async_report(None, END_MARKER_NAME)
|
||||
|
||||
async def _llm_stream_report(self, queue: asyncio.Queue):
|
||||
while self.is_chunk:
|
||||
await self.async_report(await queue.get(), "content")
|
||||
while True:
|
||||
data = await queue.get()
|
||||
if data is None:
|
||||
return
|
||||
await self.async_report(data, "content")
|
||||
|
||||
async def wait_llm_stream_report(self):
|
||||
"""Wait for the LLM stream report to complete."""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue