Merge pull request #1695 from jason-jszhang/feat_roles_examples

update role atomization capacity example
This commit is contained in:
better629 2025-02-26 15:37:35 +08:00 committed by GitHub
commit 95f05c6861
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 268 additions and 389 deletions

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from metagpt.roles.di.data_interpreter import DataInterpreter
from metagpt.tools.libs.web_scraping import view_page_element_to_scrape
async def main():
@ -10,7 +11,7 @@ async def main():
prompt = f"""This is a collection of arxiv urls: '{urls}' .
Record each article, remove duplicates by title (they may have multiple tags), filter out papers related to
large language model / agent / llm, print top 100 and visualize the word count of the titles"""
di = DataInterpreter(react_mode="react", tools=["scrape_web_playwright"])
di = DataInterpreter(react_mode="react", tools=[view_page_element_to_scrape.__name__])
await di.run(prompt)

View file

@ -0,0 +1,65 @@
import fire
from metagpt.logs import logger
from metagpt.roles.di.team_leader import TeamLeader
async def main():
tl = TeamLeader()
logger.info("\n=== Adding Initial Tasks ===")
tl.planner.plan.append_task(
task_id="T1", dependent_task_ids=[], instruction="Create Product Requirements Document (PRD)", assignee="Alice"
)
tl.planner.plan.append_task(
task_id="T2", dependent_task_ids=["T1"], instruction="Design System Architecture", assignee="Bob"
)
# 1. Add Development Tasks
logger.info("\n=== Adding Development Tasks ===")
tl.planner.plan.append_task(
task_id="T3", dependent_task_ids=["T2"], instruction="Implement Core Function Modules", assignee="Alex"
)
tl.planner.plan.append_task(
task_id="T4", dependent_task_ids=["T2"], instruction="Implement User Interface", assignee="Alex"
)
# 2. Complete Some Tasks
logger.info("\n=== Execute and Complete Tasks ===")
logger.info(f"Current Task: {tl.planner.plan.current_task.instruction}")
tl.planner.plan.finish_current_task() # Complete T1
logger.info(f"Current Task: {tl.planner.plan.current_task.instruction}")
tl.planner.plan.finish_current_task() # Complete T2
# 3. Replace Tasks
logger.info("\n=== Replace Task ===")
tl.planner.plan.replace_task(
task_id="T3",
new_dependent_task_ids=["T2"],
new_instruction="Implement Core Function Modules (Add New Features)",
new_assignee="Senior_Developer",
)
# 4. Add Testing Tasks
logger.info("\n=== Add Testing Tasks ===")
tl.planner.plan.append_task(
task_id="T5", dependent_task_ids=["T3", "T4"], instruction="Execute Integration Tests", assignee="Edward"
)
# 5. Reset Task Demonstration
logger.info("\n=== Reset Task ===")
logger.info("Reset Task T3 (This will also reset T5 which depends on it)")
tl.planner.plan.reset_task("T3")
# Display Final Status
logger.info("\n=== Final Status ===")
logger.info(f"Completed Tasks: {len([t for t in tl.planner.plan.tasks if t.is_finished])}")
logger.info(f"Current Task: {tl.planner.plan.current_task.instruction}")
logger.info("All Tasks:")
for task in tl.planner.plan.tasks:
logger.info(f"- {task.task_id}: {task.instruction} (Completed: {task.is_finished})")
if __name__ == "__main__":
fire.Fire(main)

View file

@ -0,0 +1,22 @@
import fire
from metagpt.logs import logger
from metagpt.roles.di.team_leader import TeamLeader
async def main():
# Create an instance of TeamLeader
tl = TeamLeader()
# Update the plan with the goal to create a 2048 game
# This will auto generate tasks needed to accomplish the goal
await tl.planner.update_plan(goal="create a 2048 game.")
# Iterate through all tasks in the plan
# Log each task's ID, instruction and completion status
for task in tl.planner.plan.tasks:
logger.info(f"- {task.task_id}: {task.instruction} (Completed: {task.is_finished})")
if __name__ == "__main__":
fire.Fire(main)

View file

@ -0,0 +1,40 @@
import fire
from metagpt.logs import logger
from metagpt.roles.di.data_analyst import DataAnalyst
async def main():
# Create an instance of DataAnalyst role
analyst = DataAnalyst()
# Set the main goal for the planner - constructing a 2D array
analyst.planner.plan.goal = "construct a two-dimensional array"
# Add a specific task to the planner with detailed parameters:
# - task_id: Unique identifier for the task
# - dependent_task_ids: List of tasks that need to be completed before this one (empty in this case)
# - instruction: Description of what needs to be done
# - assignee: Who will execute the task (David)
# - task_type: Category of the task (DATA_ANALYSIS)
analyst.planner.plan.append_task(
task_id="1",
dependent_task_ids=[],
instruction="construct a two-dimensional array",
assignee="David",
task_type="DATA_ANALYSIS",
)
# Execute the code generation and execution for creating a 2D array
# The write_and_exec_code method will:
# 1. Generate the necessary code for creating a 2D array
# 2. Execute the generated code
# 3. Return the result
result = await analyst.write_and_exec_code("construct a two-dimensional array")
# Log the result of the code execution
logger.info(result)
if __name__ == "__main__":
fire.Fire(main)

View file

@ -0,0 +1,37 @@
import fire
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.logs import logger
from metagpt.roles.di.team_leader import TeamLeader
from metagpt.schema import Message
async def main():
# Initialize the MetaGPT environment
env = MGXEnv()
# Add a TeamLeader role to the environment
env.add_roles([TeamLeader()])
# Get input from human user about what they want to do
human_rsp = await env.ask_human("What do you want to do")
# Log the human response for tracking
logger.info(human_rsp)
# Create and publish a message with the human response in the environment
env.publish_message(Message(content=human_rsp, role="user"))
# Get the TeamLeader role instance named 'Mike'
tl = env.get_role("Mike")
# Execute the TeamLeader's tasks
await tl.run()
# Log information about each task in the TeamLeader's plan
for task in tl.planner.plan.tasks:
logger.info(f"- {task.task_id}: {task.instruction} (Completed: {task.is_finished})")
# Send an empty response back to the human and log it
resp = await env.reply_to_human("")
logger.info(resp)
if __name__ == "__main__":
fire.Fire(main)

View file

@ -22,13 +22,13 @@ Note: All required dependencies and environments have been fully installed and c
"""
di = DataInterpreter(
tools=[
"write_prd",
"write_design",
"write_project_plan",
"write_codes",
"run_qa_test",
"fix_bug",
"git_archive",
"WritePRD",
"WriteDesign",
"WritePlan",
"WriteCode",
"RunCode",
"DebugError",
# "git_archive",
]
)

View file

@ -1,3 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :

View file

@ -1,23 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import asyncio
from metagpt.roles.di.mgx import MGX
requirement = (
# "design a game using Gym (an open source Python library), including a graphical interface and interactive gameplay"
# "帮我把pip的源设置成https://pypi.tuna.tsinghua.edu.cn/simple"
# "This is a website url does not require login: https://demosc.chinaz.net/Files/DownLoad//moban/202404/moban7767 please write a similar web page,developed in vue language, The package.json dependency must be generated"
"I would like to imitate the website available at https://demosc.chinaz.net/Files/DownLoad//moban/202404/moban7767. Could you please browse through it?"
# "Create a 2048 Game"
)
async def main(requirement: str = ""):
mgx = MGX(use_intent=True, tools=["<all>"])
await mgx.run(requirement)
if __name__ == "__main__":
asyncio.run(main(requirement))

View file

@ -1,72 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import shutil
from pathlib import Path
import typer
from metagpt.actions.rebuild_class_view import RebuildClassView
from metagpt.actions.rebuild_sequence_view import RebuildSequenceView
from metagpt.context import Context
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.utils.git_repository import GitRepository
from metagpt.utils.project_repo import ProjectRepo
app = typer.Typer(add_completion=False, pretty_exceptions_show_locals=False)
@app.command("", help="Python project reverse engineering.")
def startup(
project_root: str = typer.Argument(
default="",
help="Specify the root directory of the existing project for reverse engineering.",
),
output_dir: str = typer.Option(default="", help="Specify the output directory path for reverse engineering."),
):
package_root = Path(project_root)
if not package_root.exists():
raise FileNotFoundError(f"{project_root} not exists")
if not _is_python_package_root(package_root):
raise FileNotFoundError(f'There are no "*.py" files under "{project_root}".')
init_file = package_root / "__init__.py" # used by pyreverse
init_file_exists = init_file.exists()
if not init_file_exists:
init_file.touch()
if not output_dir:
output_dir = package_root / "../reverse_engineering_output"
logger.info(f"output dir:{output_dir}")
try:
asyncio.run(reverse_engineering(package_root, Path(output_dir)))
finally:
if not init_file_exists:
init_file.unlink(missing_ok=True)
tmp_dir = package_root / "__dot__"
if tmp_dir.exists():
shutil.rmtree(tmp_dir, ignore_errors=True)
def _is_python_package_root(package_root: Path) -> bool:
for file_path in package_root.iterdir():
if file_path.is_file():
if file_path.suffix == ".py":
return True
return False
async def reverse_engineering(package_root: Path, output_dir: Path):
ctx = Context()
ctx.git_repo = GitRepository(output_dir)
ctx.repo = ProjectRepo(ctx.git_repo)
action = RebuildClassView(name="ReverseEngineering", i_context=str(package_root), llm=LLM(), context=ctx)
await action.run()
action = RebuildSequenceView(name="ReverseEngineering", llm=LLM(), context=ctx)
await action.run()
if __name__ == "__main__":
app()

View file

@ -1,82 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/9/13 12:36
@Author : femto Zheng
@File : sk_agent.py
"""
import asyncio
from semantic_kernel.core_skills import FileIOSkill, MathSkill, TextSkill, TimeSkill
from semantic_kernel.planning import SequentialPlanner
# from semantic_kernel.planning import SequentialPlanner
from semantic_kernel.planning.action_planner.action_planner import ActionPlanner
from metagpt.actions import UserRequirement
from metagpt.const import SKILL_DIRECTORY
from metagpt.roles.sk_agent import SkAgent
from metagpt.schema import Message
from metagpt.tools.search_engine import SkSearchEngine
async def main():
# await basic_planner_example()
# await action_planner_example()
# await sequential_planner_example()
await basic_planner_web_search_example()
async def basic_planner_example():
task = """
Tomorrow is Valentine's day. I need to come up with a few date ideas. She speaks French so write it in French.
Convert the text to uppercase"""
role = SkAgent()
# let's give the agent some skills
role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "SummarizeSkill")
role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "WriterSkill")
role.import_skill(TextSkill(), "TextSkill")
# using BasicPlanner
await role.run(Message(content=task, cause_by=UserRequirement))
async def sequential_planner_example():
task = """
Tomorrow is Valentine's day. I need to come up with a few date ideas. She speaks French so write it in French.
Convert the text to uppercase"""
role = SkAgent(planner_cls=SequentialPlanner)
# let's give the agent some skills
role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "SummarizeSkill")
role.import_semantic_skill_from_directory(SKILL_DIRECTORY, "WriterSkill")
role.import_skill(TextSkill(), "TextSkill")
# using BasicPlanner
await role.run(Message(content=task, cause_by=UserRequirement))
async def basic_planner_web_search_example():
task = """
Question: Who made the 1989 comic book, the film version of which Jon Raymond Polito appeared in?"""
role = SkAgent()
role.import_skill(SkSearchEngine(), "WebSearchSkill")
# role.import_semantic_skill_from_directory(skills_directory, "QASkill")
await role.run(Message(content=task, cause_by=UserRequirement))
async def action_planner_example():
role = SkAgent(planner_cls=ActionPlanner)
# let's give the agent 4 skills
role.import_skill(MathSkill(), "math")
role.import_skill(FileIOSkill(), "fileIO")
role.import_skill(TimeSkill(), "time")
role.import_skill(TextSkill(), "text")
task = "What is the sum of 110 and 990?"
await role.run(Message(content=task, cause_by=UserRequirement)) # it will choose mathskill.Add
if __name__ == "__main__":
asyncio.run(main())

View file

@ -7,11 +7,20 @@ import asyncio
from metagpt.logs import logger
from metagpt.roles.product_manager import ProductManager
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.schema import Message
from metagpt.roles.di.team_leader import TeamLeader
async def main():
msg = "Write a PRD for a snake game"
role = ProductManager()
env = MGXEnv()
env.add_roles([TeamLeader(), ProductManager()])
env.publish_message(Message(content=msg, role="user"))
tl = env.get_role("Mike")
await tl.run()
role = env.get_role("Alice")
result = await role.run(msg)
logger.info(result.content[:100])

24
examples/write_design.py Normal file
View file

@ -0,0 +1,24 @@
import asyncio
from metagpt.logs import logger
from metagpt.roles.architect import Architect
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.schema import Message
from metagpt.roles.di.team_leader import TeamLeader
async def main():
msg = "Write a TRD for a snake game"
env = MGXEnv()
env.add_roles([TeamLeader(), Architect()])
env.publish_message(Message(content=msg, role="user"))
tl = env.get_role("Mike")
await tl.run()
role = env.get_role("Bob")
result = await role.run(msg)
logger.info(result)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,42 @@
import asyncio
import time
from metagpt.environment.mgx.mgx_env import MGXEnv
from metagpt.roles.di.engineer2 import Engineer2
from metagpt.roles.di.team_leader import TeamLeader
from metagpt.schema import Message
async def main(requirement="", user_defined_recipient="", enable_human_input=False, allow_idle_time=30):
env = MGXEnv()
env.add_roles([TeamLeader(), Engineer2()])
msg = Message(content=requirement)
env.attach_images(msg) # attach image content if applicable
if user_defined_recipient:
msg.send_to = {user_defined_recipient}
env.publish_message(msg, user_defined_recipient=user_defined_recipient)
else:
env.publish_message(msg)
allow_idle_time = allow_idle_time if enable_human_input else 1
start_time = time.time()
while time.time() - start_time < allow_idle_time:
if not env.is_idle:
await env.run()
start_time = time.time() # reset start time
if __name__ == "__main__":
requirement = "Write code for a 2048 game"
user_defined_recipient = ""
asyncio.run(
main(
requirement=requirement,
user_defined_recipient=user_defined_recipient,
enable_human_input=False,
allow_idle_time=60,
)
)

View file

@ -1,194 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/6/13
@Author : mashenquan
@File : write_project_framework.py
@Desc : The implementation of RFC243. https://deepwisdom.feishu.cn/wiki/QobGwPkImijoyukBUKHcrYetnBb
"""
import asyncio
import json
import uuid
from pathlib import Path
from typing import Dict, List
import typer
from metagpt.actions.requirement_analysis.framework import (
EvaluateFramework,
WriteFramework,
save_framework,
)
from metagpt.actions.requirement_analysis.trd import (
CompressExternalInterfaces,
DetectInteraction,
EvaluateTRD,
WriteTRD,
)
from metagpt.config2 import Config
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.context import Context
from metagpt.logs import logger
from metagpt.utils.common import aread
app = typer.Typer(add_completion=False)
async def _write_trd(
context: Context, actors: Dict[str, str], user_requirements: List[str], available_external_interfaces: str
) -> (str, str):
detect_interaction = DetectInteraction(context=context)
write_trd = WriteTRD(context=context)
evaluate_trd = EvaluateTRD(context=context)
use_case_actors = "".join([f"- {v}: {k}\n" for k, v in actors.items()])
legacy_user_requirements = []
legacy_user_requirements_interaction_events = []
legacy_user_requirements_trd = ""
for ix, r in enumerate(user_requirements):
is_pass = False
evaluation_conclusion = ""
interaction_events = ""
trd = ""
while not is_pass and (context.cost_manager.total_cost < context.cost_manager.max_budget):
interaction_events = await detect_interaction.run(
user_requirements=r,
use_case_actors=use_case_actors,
legacy_interaction_events=interaction_events,
evaluation_conclusion=evaluation_conclusion,
)
if ix == 0:
trd = await write_trd.run(
user_requirements=r,
use_case_actors=use_case_actors,
available_external_interfaces=available_external_interfaces,
evaluation_conclusion=evaluation_conclusion,
interaction_events=interaction_events,
previous_version_trd=trd,
)
else:
trd = await write_trd.run(
user_requirements=r,
use_case_actors=use_case_actors,
available_external_interfaces=available_external_interfaces,
evaluation_conclusion=evaluation_conclusion,
interaction_events=interaction_events,
previous_version_trd=trd,
legacy_user_requirements="\n".join(legacy_user_requirements),
legacy_user_requirements_trd=legacy_user_requirements_trd,
legacy_user_requirements_interaction_events="\n".join(legacy_user_requirements_interaction_events),
)
evaluation = await evaluate_trd.run(
user_requirements=r,
use_case_actors=use_case_actors,
trd=trd,
interaction_events=interaction_events,
legacy_user_requirements_interaction_events="\n".join(legacy_user_requirements_interaction_events),
)
is_pass = evaluation.is_pass
evaluation_conclusion = evaluation.conclusion
legacy_user_requirements.append(r)
legacy_user_requirements_interaction_events.append(interaction_events)
legacy_user_requirements_trd = trd
return use_case_actors, legacy_user_requirements_trd
async def _write_framework(context: Context, use_case_actors: str, trd: str, acknowledge: str, constraint: str) -> str:
write_framework = WriteFramework(context=context)
evaluate_framework = EvaluateFramework(context=context)
is_pass = False
framework = ""
evaluation_conclusion = ""
while not is_pass and (context.cost_manager.total_cost < context.cost_manager.max_budget):
try:
framework = await write_framework.run(
use_case_actors=use_case_actors,
trd=trd,
acknowledge=acknowledge,
legacy_output=framework,
evaluation_conclusion=evaluation_conclusion,
additional_technical_requirements=constraint,
)
except Exception as e:
logger.info(f"{e}")
break
evaluation = await evaluate_framework.run(
use_case_actors=use_case_actors,
trd=trd,
acknowledge=acknowledge,
legacy_output=framework,
additional_technical_requirements=constraint,
)
is_pass = evaluation.is_pass
evaluation_conclusion = evaluation.conclusion
return framework
async def develop(
context: Context,
user_requirement_filename: str,
actors_filename: str,
acknowledge_filename: str,
constraint_filename: str,
output_dir: str,
):
output_dir = Path(output_dir) if output_dir else DEFAULT_WORKSPACE_ROOT / uuid.uuid4().hex
v = await aread(filename=user_requirement_filename)
user_requirements = json.loads(v)
v = await aread(filename=actors_filename)
actors = json.loads(v)
acknowledge = await aread(filename=acknowledge_filename)
technical_constraint = await aread(filename=constraint_filename)
# Compress acknowledge
compress_acknowledge = CompressExternalInterfaces(context=context)
available_external_interfaces = await compress_acknowledge.run(acknowledge=acknowledge)
# Write TRD
use_case_actors, trd = await _write_trd(
context=context,
actors=actors,
user_requirements=user_requirements,
available_external_interfaces=available_external_interfaces,
)
# Write framework
framework = await _write_framework(
context=context,
use_case_actors=use_case_actors,
trd=trd,
acknowledge=acknowledge,
constraint=technical_constraint,
)
# Save
file_list = await save_framework(dir_data=framework, trd=trd, output_dir=output_dir)
logger.info(f"Output:\n{file_list}")
@app.command()
def startup(
user_requirement_filename: str = typer.Argument(..., help="The filename of the user requirements."),
actors_filename: str = typer.Argument(..., help="The filename of UML use case actors description."),
acknowledge_filename: str = typer.Argument(..., help="External interfaces declarations."),
llm_config: str = typer.Option(default="", help="Low-cost LLM config"),
constraint_filename: str = typer.Option(default="", help="What technical dependency constraints are."),
output_dir: str = typer.Option(default="", help="Output directory."),
investment: float = typer.Option(default=15.0, help="Dollar amount to invest in the AI company."),
):
if llm_config and Path(llm_config).exists():
config = Config.from_yaml_file(Path(llm_config))
else:
logger.info("GPT 4 turbo is recommended")
config = Config.default()
ctx = Context(config=config)
ctx.cost_manager.max_budget = investment
asyncio.run(
develop(ctx, user_requirement_filename, actors_filename, acknowledge_filename, constraint_filename, output_dir)
)
if __name__ == "__main__":
app()

View file

@ -65,10 +65,14 @@ def update_plan_from_rsp(rsp: str, current_plan: Plan):
# handle a single task
if current_plan.has_task_id(tasks[0].task_id):
# replace an existing task
current_plan.replace_task(tasks[0])
current_plan.replace_task(
tasks[0].task_id, tasks[0].dependent_task_ids, tasks[0].instruction, tasks[0].assignee
)
else:
# append one task
current_plan.append_task(tasks[0])
current_plan.append_task(
tasks[0].task_id, tasks[0].dependent_task_ids, tasks[0].instruction, tasks[0].assignee
)
else:
# add tasks in general

View file

@ -165,3 +165,6 @@ EXPERIENCE_MASK = "<experience>"
# TeamLeader's name
TEAMLEADER_NAME = "Mike"
DEFAULT_MIN_TOKEN_COUNT = 10000
DEFAULT_MAX_TOKEN_COUNT = 100000000

View file

@ -13,8 +13,7 @@ from typing import List, Optional, Union
import tiktoken
from pydantic import BaseModel, ConfigDict
from metagpt.const import DEFAULT_WORKSPACE_ROOT
from metagpt.tools.libs.index_repo import DEFAULT_MIN_TOKEN_COUNT, IndexRepo
from metagpt.const import DEFAULT_MIN_TOKEN_COUNT, DEFAULT_WORKSPACE_ROOT
from metagpt.tools.libs.linter import Linter
from metagpt.tools.tool_registry import register_tool
from metagpt.utils.common import awrite
@ -1125,7 +1124,12 @@ class Editor(BaseModel):
>>> texts: List[str] = await Editor.similarity_search(query=query, path=file_or_path)
>>> print(texts)
"""
return await IndexRepo.cross_repo_search(query=query, file_or_path=path)
try:
from metagpt.tools.libs.index_repo import IndexRepo
return await IndexRepo.cross_repo_search(query=query, file_or_path=path)
except ImportError:
raise ImportError("To use the similarity search, you need to install the RAG module.")
@staticmethod
def is_large_file(content: str, mix_token_count: int = 0) -> bool:

View file

@ -75,4 +75,6 @@ pygithub~=2.3
htmlmin
fsspec
grep-ast~=0.3.3 # linter
tree-sitter~=0.21.3 # linter
tree-sitter~=0.21.3 # linter
unidiff==0.7.5 # used at metagpt/tools/libs/cr.py
httpx==0.27.2