Merge branch 'geekan:dev' into dev

This commit is contained in:
better629 2024-01-15 15:57:00 +08:00 committed by GitHub
commit 49e7d7eaea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
68 changed files with 1797 additions and 622 deletions

View file

@ -21,7 +21,7 @@ from metagpt.schema import (
SerializationMixin,
TestingContext,
)
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.project_repo import ProjectRepo
class Action(SerializationMixin, ContextMixin, BaseModel):
@ -34,16 +34,8 @@ class Action(SerializationMixin, ContextMixin, BaseModel):
node: ActionNode = Field(default=None, exclude=True)
@property
def git_repo(self):
return self.context.git_repo
@property
def file_repo(self):
return FileRepository(self.context.git_repo)
@property
def src_workspace(self):
return self.context.src_workspace
def project_repo(self):
return ProjectRepo(self.context.git_repo)
@property
def prompt_schema(self):

View file

@ -13,7 +13,6 @@ import re
from pydantic import Field
from metagpt.actions.action import Action
from metagpt.const import TEST_CODES_FILE_REPO, TEST_OUTPUTS_FILE_REPO
from metagpt.logs import logger
from metagpt.schema import RunCodeContext, RunCodeResult
from metagpt.utils.common import CodeParser
@ -50,9 +49,7 @@ class DebugError(Action):
i_context: RunCodeContext = Field(default_factory=RunCodeContext)
async def run(self, *args, **kwargs) -> str:
output_doc = await self.file_repo.get_file(
filename=self.i_context.output_filename, relative_path=TEST_OUTPUTS_FILE_REPO
)
output_doc = await self.project_repo.test_outputs.get(filename=self.i_context.output_filename)
if not output_doc:
return ""
output_detail = RunCodeResult.loads(output_doc.content)
@ -62,14 +59,12 @@ class DebugError(Action):
return ""
logger.info(f"Debug and rewrite {self.i_context.test_filename}")
code_doc = await self.file_repo.get_file(
filename=self.i_context.code_filename, relative_path=self.context.src_workspace
code_doc = await self.project_repo.with_src_path(self.context.src_workspace).srcs.get(
filename=self.i_context.code_filename
)
if not code_doc:
return ""
test_doc = await self.file_repo.get_file(
filename=self.i_context.test_filename, relative_path=TEST_CODES_FILE_REPO
)
test_doc = await self.project_repo.tests.get(filename=self.i_context.test_filename)
if not test_doc:
return ""
prompt = PROMPT_TEMPLATE.format(code=code_doc.content, test_code=test_doc.content, logs=output_detail.stderr)

View file

@ -15,13 +15,7 @@ from typing import Optional
from metagpt.actions import Action, ActionOutput
from metagpt.actions.design_api_an import DESIGN_API_NODE
from metagpt.const import (
DATA_API_DESIGN_FILE_REPO,
PRDS_FILE_REPO,
SEQ_FLOW_FILE_REPO,
SYSTEM_DESIGN_FILE_REPO,
SYSTEM_DESIGN_PDF_FILE_REPO,
)
from metagpt.const import DATA_API_DESIGN_FILE_REPO, SEQ_FLOW_FILE_REPO
from metagpt.logs import logger
from metagpt.schema import Document, Documents, Message
from metagpt.utils.mermaid import mermaid_to_file
@ -46,27 +40,21 @@ class WriteDesign(Action):
async def run(self, with_messages: Message, schema: str = None):
# Use `git status` to identify which PRD documents have been modified in the `docs/prds` directory.
prds_file_repo = self.git_repo.new_file_repository(PRDS_FILE_REPO)
changed_prds = prds_file_repo.changed_files
changed_prds = self.project_repo.docs.prd.changed_files
# Use `git status` to identify which design documents in the `docs/system_designs` directory have undergone
# changes.
system_design_file_repo = self.git_repo.new_file_repository(SYSTEM_DESIGN_FILE_REPO)
changed_system_designs = system_design_file_repo.changed_files
changed_system_designs = self.project_repo.docs.system_design.changed_files
# For those PRDs and design documents that have undergone changes, regenerate the design content.
changed_files = Documents()
for filename in changed_prds.keys():
doc = await self._update_system_design(
filename=filename, prds_file_repo=prds_file_repo, system_design_file_repo=system_design_file_repo
)
doc = await self._update_system_design(filename=filename)
changed_files.docs[filename] = doc
for filename in changed_system_designs.keys():
if filename in changed_files.docs:
continue
doc = await self._update_system_design(
filename=filename, prds_file_repo=prds_file_repo, system_design_file_repo=system_design_file_repo
)
doc = await self._update_system_design(filename=filename)
changed_files.docs[filename] = doc
if not changed_files.docs:
logger.info("Nothing has changed.")
@ -84,24 +72,22 @@ class WriteDesign(Action):
system_design_doc.content = node.instruct_content.model_dump_json()
return system_design_doc
async def _update_system_design(self, filename, prds_file_repo, system_design_file_repo) -> Document:
prd = await prds_file_repo.get(filename)
old_system_design_doc = await system_design_file_repo.get(filename)
async def _update_system_design(self, filename) -> Document:
prd = await self.project_repo.docs.prd.get(filename)
old_system_design_doc = await self.project_repo.docs.system_design.get(filename)
if not old_system_design_doc:
system_design = await self._new_system_design(context=prd.content)
doc = Document(
root_path=SYSTEM_DESIGN_FILE_REPO,
doc = await self.project_repo.docs.system_design.save(
filename=filename,
content=system_design.instruct_content.model_dump_json(),
dependencies={prd.root_relative_path},
)
else:
doc = await self._merge(prd_doc=prd, system_design_doc=old_system_design_doc)
await system_design_file_repo.save(
filename=filename, content=doc.content, dependencies={prd.root_relative_path}
)
await self.project_repo.docs.system_design.save_doc(doc=doc, dependencies={prd.root_relative_path})
await self._save_data_api_design(doc)
await self._save_seq_flow(doc)
await self._save_pdf(doc)
await self.project_repo.resources.system_design.save_pdf(doc=doc)
return doc
async def _save_data_api_design(self, design_doc):
@ -109,7 +95,7 @@ class WriteDesign(Action):
data_api_design = m.get("Data structures and interfaces")
if not data_api_design:
return
pathname = self.git_repo.workdir / DATA_API_DESIGN_FILE_REPO / Path(design_doc.filename).with_suffix("")
pathname = self.project_repo.workdir / DATA_API_DESIGN_FILE_REPO / Path(design_doc.filename).with_suffix("")
await self._save_mermaid_file(data_api_design, pathname)
logger.info(f"Save class view to {str(pathname)}")
@ -118,13 +104,10 @@ class WriteDesign(Action):
seq_flow = m.get("Program call flow")
if not seq_flow:
return
pathname = self.git_repo.workdir / Path(SEQ_FLOW_FILE_REPO) / Path(design_doc.filename).with_suffix("")
pathname = self.project_repo.workdir / Path(SEQ_FLOW_FILE_REPO) / Path(design_doc.filename).with_suffix("")
await self._save_mermaid_file(seq_flow, pathname)
logger.info(f"Saving sequence flow to {str(pathname)}")
async def _save_pdf(self, design_doc):
await self.file_repo.save_as(doc=design_doc, with_suffix=".md", relative_path=SYSTEM_DESIGN_PDF_FILE_REPO)
async def _save_mermaid_file(self, data: str, pathname: Path):
pathname.parent.mkdir(parents=True, exist_ok=True)
await mermaid_to_file(self.config.mermaid_engine, data, pathname)

View file

@ -12,8 +12,7 @@ from pathlib import Path
from typing import Optional
from metagpt.actions import Action, ActionOutput
from metagpt.const import DOCS_FILE_REPO, REQUIREMENT_FILENAME
from metagpt.schema import Document
from metagpt.const import REQUIREMENT_FILENAME
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
@ -38,7 +37,6 @@ class PrepareDocuments(Action):
if path.exists() and not self.config.inc:
shutil.rmtree(path)
self.config.project_path = path
self.config.project_name = path.name
self.context.git_repo = GitRepository(local_path=path, auto_init=True)
async def run(self, with_messages, **kwargs):
@ -46,9 +44,7 @@ class PrepareDocuments(Action):
self._init_repo()
# Write the newly added requirements from the main parameter idea to `docs/requirement.txt`.
doc = Document(root_path=DOCS_FILE_REPO, filename=REQUIREMENT_FILENAME, content=with_messages[0].content)
await self.file_repo.save_file(filename=REQUIREMENT_FILENAME, content=doc.content, relative_path=DOCS_FILE_REPO)
doc = await self.project_repo.docs.save(filename=REQUIREMENT_FILENAME, content=with_messages[0].content)
# Send a Message notification to the WritePRD action, instructing it to process requirements using
# `docs/requirement.txt` and `docs/prds/`.
return ActionOutput(content=doc.content, instruct_content=doc)

View file

@ -16,12 +16,7 @@ from typing import Optional
from metagpt.actions import ActionOutput
from metagpt.actions.action import Action
from metagpt.actions.project_management_an import PM_NODE
from metagpt.const import (
PACKAGE_REQUIREMENTS_FILENAME,
SYSTEM_DESIGN_FILE_REPO,
TASK_FILE_REPO,
TASK_PDF_FILE_REPO,
)
from metagpt.const import PACKAGE_REQUIREMENTS_FILENAME
from metagpt.logs import logger
from metagpt.schema import Document, Documents
@ -39,27 +34,20 @@ class WriteTasks(Action):
i_context: Optional[str] = None
async def run(self, with_messages):
system_design_file_repo = self.git_repo.new_file_repository(SYSTEM_DESIGN_FILE_REPO)
changed_system_designs = system_design_file_repo.changed_files
tasks_file_repo = self.git_repo.new_file_repository(TASK_FILE_REPO)
changed_tasks = tasks_file_repo.changed_files
changed_system_designs = self.project_repo.docs.system_design.changed_files
changed_tasks = self.project_repo.docs.task.changed_files
change_files = Documents()
# Rewrite the system designs that have undergone changes based on the git head diff under
# `docs/system_designs/`.
for filename in changed_system_designs:
task_doc = await self._update_tasks(
filename=filename, system_design_file_repo=system_design_file_repo, tasks_file_repo=tasks_file_repo
)
task_doc = await self._update_tasks(filename=filename)
change_files.docs[filename] = task_doc
# Rewrite the task files that have undergone changes based on the git head diff under `docs/tasks/`.
for filename in changed_tasks:
if filename in change_files.docs:
continue
task_doc = await self._update_tasks(
filename=filename, system_design_file_repo=system_design_file_repo, tasks_file_repo=tasks_file_repo
)
task_doc = await self._update_tasks(filename=filename)
change_files.docs[filename] = task_doc
if not change_files.docs:
@ -68,21 +56,22 @@ class WriteTasks(Action):
# global optimization in subsequent steps.
return ActionOutput(content=change_files.model_dump_json(), instruct_content=change_files)
async def _update_tasks(self, filename, system_design_file_repo, tasks_file_repo):
system_design_doc = await system_design_file_repo.get(filename)
task_doc = await tasks_file_repo.get(filename)
async def _update_tasks(self, filename):
system_design_doc = await self.project_repo.docs.system_design.get(filename)
task_doc = await self.project_repo.docs.task.get(filename)
if task_doc:
task_doc = await self._merge(system_design_doc=system_design_doc, task_doc=task_doc)
await self.project_repo.docs.task.save_doc(
doc=task_doc, dependencies={system_design_doc.root_relative_path}
)
else:
rsp = await self._run_new_tasks(context=system_design_doc.content)
task_doc = Document(
root_path=TASK_FILE_REPO, filename=filename, content=rsp.instruct_content.model_dump_json()
task_doc = await self.project_repo.docs.task.save(
filename=filename,
content=rsp.instruct_content.model_dump_json(),
dependencies={system_design_doc.root_relative_path},
)
await tasks_file_repo.save(
filename=filename, content=task_doc.content, dependencies={system_design_doc.root_relative_path}
)
await self._update_requirements(task_doc)
await self._save_pdf(task_doc=task_doc)
return task_doc
async def _run_new_tasks(self, context):
@ -98,8 +87,7 @@ class WriteTasks(Action):
async def _update_requirements(self, doc):
m = json.loads(doc.content)
packages = set(m.get("Required Python third-party packages", set()))
file_repo = self.git_repo.new_file_repository()
requirement_doc = await file_repo.get(filename=PACKAGE_REQUIREMENTS_FILENAME)
requirement_doc = await self.project_repo.get(filename=PACKAGE_REQUIREMENTS_FILENAME)
if not requirement_doc:
requirement_doc = Document(filename=PACKAGE_REQUIREMENTS_FILENAME, root_path=".", content="")
lines = requirement_doc.content.splitlines()
@ -107,7 +95,4 @@ class WriteTasks(Action):
if pkg == "":
continue
packages.add(pkg)
await file_repo.save(PACKAGE_REQUIREMENTS_FILENAME, content="\n".join(packages))
async def _save_pdf(self, task_doc):
await self.file_repo.save_as(doc=task_doc, with_suffix=".md", relative_path=TASK_PDF_FILE_REPO)
await self.project_repo.save(filename=PACKAGE_REQUIREMENTS_FILENAME, content="\n".join(packages))

View file

@ -20,7 +20,6 @@ from metagpt.const import (
GENERALIZATION,
GRAPH_REPO_FILE_REPO,
)
from metagpt.context import CONTEXT
from metagpt.logs import logger
from metagpt.repo_parser import RepoParser
from metagpt.schema import ClassAttribute, ClassMethod, ClassView
@ -31,7 +30,7 @@ from metagpt.utils.graph_repository import GraphKeyword, GraphRepository
class RebuildClassView(Action):
async def run(self, with_messages=None, format=config.prompt_schema):
graph_repo_pathname = CONTEXT.git_repo.workdir / GRAPH_REPO_FILE_REPO / CONTEXT.git_repo.workdir.name
graph_repo_pathname = self.context.git_repo.workdir / GRAPH_REPO_FILE_REPO / self.context.git_repo.workdir.name
graph_db = await DiGraphRepository.load_from(str(graph_repo_pathname.with_suffix(".json")))
repo_parser = RepoParser(base_directory=Path(self.i_context))
# use pylint
@ -49,9 +48,9 @@ class RebuildClassView(Action):
await graph_db.save()
async def _create_mermaid_class_views(self, graph_db):
path = Path(CONTEXT.git_repo.workdir) / DATA_API_DESIGN_FILE_REPO
path = Path(self.context.git_repo.workdir) / DATA_API_DESIGN_FILE_REPO
path.mkdir(parents=True, exist_ok=True)
pathname = path / CONTEXT.git_repo.workdir.name
pathname = path / self.context.git_repo.workdir.name
async with aiofiles.open(str(pathname.with_suffix(".mmd")), mode="w", encoding="utf-8") as writer:
content = "classDiagram\n"
logger.debug(content)

View file

@ -14,7 +14,6 @@ from typing import List
from metagpt.actions import Action
from metagpt.config2 import config
from metagpt.const import GRAPH_REPO_FILE_REPO
from metagpt.context import CONTEXT
from metagpt.logs import logger
from metagpt.utils.common import aread, list_files
from metagpt.utils.di_graph_repository import DiGraphRepository
@ -23,7 +22,7 @@ from metagpt.utils.graph_repository import GraphKeyword
class RebuildSequenceView(Action):
async def run(self, with_messages=None, format=config.prompt_schema):
graph_repo_pathname = CONTEXT.git_repo.workdir / GRAPH_REPO_FILE_REPO / CONTEXT.git_repo.workdir.name
graph_repo_pathname = self.context.git_repo.workdir / GRAPH_REPO_FILE_REPO / self.context.git_repo.workdir.name
graph_db = await DiGraphRepository.load_from(str(graph_repo_pathname.with_suffix(".json")))
entries = await RebuildSequenceView._search_main_entry(graph_db)
for entry in entries:
@ -43,6 +42,8 @@ class RebuildSequenceView(Action):
async def _rebuild_sequence_view(self, entry, graph_db):
filename = entry.subject.split(":", 1)[0]
src_filename = RebuildSequenceView._get_full_filename(root=self.i_context, pathname=filename)
if not src_filename:
return
content = await aread(filename=src_filename, encoding="utf-8")
content = f"```python\n{content}\n```\n\n---\nTranslate the code above into Mermaid Sequence Diagram."
data = await self.llm.aask(

View file

@ -29,9 +29,7 @@ class ArgumentsParingAction(Action):
@property
def prompt(self):
prompt = "You are a function parser. You can convert spoken words into function parameters.\n"
prompt += "\n---\n"
prompt += f"{self.skill.name} function parameters description:\n"
prompt = f"{self.skill.name} function parameters description:\n"
for k, v in self.skill.arguments.items():
prompt += f"parameter `{k}`: {v}\n"
prompt += "\n---\n"
@ -49,7 +47,10 @@ class ArgumentsParingAction(Action):
async def run(self, with_message=None, **kwargs) -> Message:
prompt = self.prompt
rsp = await self.llm.aask(msg=prompt, system_msgs=[])
rsp = await self.llm.aask(
msg=prompt,
system_msgs=["You are a function parser.", "You can convert spoken words into function parameters."],
)
logger.debug(f"SKILL:{prompt}\n, RESULT:{rsp}")
self.args = ArgumentsParingAction.parse_arguments(skill_name=self.skill.name, txt=rsp)
self.rsp = Message(content=rsp, role="assistant", instruct_content=self.args, cause_by=self)

View file

@ -11,7 +11,6 @@ from pydantic import Field
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action import Action
from metagpt.const import SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO
from metagpt.logs import logger
from metagpt.schema import CodeSummarizeContext
@ -99,11 +98,10 @@ class SummarizeCode(Action):
async def run(self):
design_pathname = Path(self.i_context.design_filename)
repo = self.file_repo
design_doc = await repo.get_file(filename=design_pathname.name, relative_path=SYSTEM_DESIGN_FILE_REPO)
design_doc = await self.project_repo.docs.system_design.get(filename=design_pathname.name)
task_pathname = Path(self.i_context.task_filename)
task_doc = await repo.get_file(filename=task_pathname.name, relative_path=TASK_FILE_REPO)
src_file_repo = self.git_repo.new_file_repository(relative_path=self.context.src_workspace)
task_doc = await self.project_repo.docs.task.get(filename=task_pathname.name)
src_file_repo = self.project_repo.with_src_path(self.context.src_workspace).srcs
code_blocks = []
for filename in self.i_context.codes_filenames:
code_doc = await src_file_repo.get(filename)

View file

@ -21,13 +21,7 @@ from pydantic import Field
from tenacity import retry, stop_after_attempt, wait_random_exponential
from metagpt.actions.action import Action
from metagpt.const import (
BUGFIX_FILENAME,
CODE_SUMMARIES_FILE_REPO,
DOCS_FILE_REPO,
TASK_FILE_REPO,
TEST_OUTPUTS_FILE_REPO,
)
from metagpt.const import BUGFIX_FILENAME
from metagpt.logs import logger
from metagpt.schema import CodingContext, Document, RunCodeResult
from metagpt.utils.common import CodeParser
@ -94,16 +88,12 @@ class WriteCode(Action):
return code
async def run(self, *args, **kwargs) -> CodingContext:
bug_feedback = await self.file_repo.get_file(filename=BUGFIX_FILENAME, relative_path=DOCS_FILE_REPO)
bug_feedback = await self.project_repo.docs.get(filename=BUGFIX_FILENAME)
coding_context = CodingContext.loads(self.i_context.content)
test_doc = await self.file_repo.get_file(
filename="test_" + coding_context.filename + ".json", relative_path=TEST_OUTPUTS_FILE_REPO
)
test_doc = await self.project_repo.test_outputs.get(filename="test_" + coding_context.filename + ".json")
summary_doc = None
if coding_context.design_doc and coding_context.design_doc.filename:
summary_doc = await self.file_repo.get_file(
filename=coding_context.design_doc.filename, relative_path=CODE_SUMMARIES_FILE_REPO
)
summary_doc = await self.project_repo.docs.code_summary.get(filename=coding_context.design_doc.filename)
logs = ""
if test_doc:
test_detail = RunCodeResult.loads(test_doc.content)
@ -115,8 +105,7 @@ class WriteCode(Action):
code_context = await self.get_codes(
coding_context.task_doc,
exclude=self.i_context.filename,
git_repo=self.git_repo,
src_workspace=self.context.src_workspace,
project_repo=self.project_repo.with_src_path(self.context.src_workspace),
)
prompt = PROMPT_TEMPLATE.format(
@ -138,16 +127,15 @@ class WriteCode(Action):
return coding_context
@staticmethod
async def get_codes(task_doc, exclude, git_repo, src_workspace) -> str:
async def get_codes(task_doc, exclude, project_repo) -> str:
if not task_doc:
return ""
if not task_doc.content:
file_repo = git_repo.new_file_repository()
task_doc.content = file_repo.get_file(filename=task_doc.filename, relative_path=TASK_FILE_REPO)
task_doc = project_repo.docs.task.get(filename=task_doc.filename)
m = json.loads(task_doc.content)
code_filenames = m.get("Task list", [])
codes = []
src_file_repo = git_repo.new_file_repository(relative_path=src_workspace)
src_file_repo = project_repo.srcs
for filename in code_filenames:
if filename == exclude:
continue

View file

@ -5,7 +5,7 @@
@File : write_review.py
"""
import asyncio
from typing import List
from typing import List, Literal
from metagpt.actions import Action
from metagpt.actions.action_node import ActionNode
@ -21,16 +21,15 @@ REVIEW = ActionNode(
],
)
LGTM = ActionNode(
key="LGTM",
expected_type=str,
instruction="LGTM/LBTM. If the code is fully implemented, "
"give a LGTM (Looks Good To Me), otherwise provide a LBTM (Looks Bad To Me).",
REVIEW_RESULT = ActionNode(
key="ReviewResult",
expected_type=Literal["LGTM", "LBTM"],
instruction="LGTM/LBTM. If the code is fully implemented, " "give a LGTM, otherwise provide a LBTM.",
example="LBTM",
)
ACTIONS = ActionNode(
key="Actions",
NEXT_STEPS = ActionNode(
key="NextSteps",
expected_type=str,
instruction="Based on the code review outcome, suggest actionable steps. This can include code changes, "
"refactoring suggestions, or any follow-up tasks.",
@ -69,7 +68,7 @@ WRITE_DRAFT = ActionNode(
)
WRITE_MOVE_FUNCTION = ActionNode(
WRITE_FUNCTION = ActionNode(
key="WriteFunction",
expected_type=str,
instruction="write code for the function not implemented.",
@ -555,8 +554,8 @@ LBTM
"""
WRITE_CODE_NODE = ActionNode.from_children("WRITE_REVIEW_NODE", [REVIEW, LGTM, ACTIONS])
WRITE_MOVE_NODE = ActionNode.from_children("WRITE_MOVE_NODE", [WRITE_DRAFT, WRITE_MOVE_FUNCTION])
WRITE_CODE_NODE = ActionNode.from_children("WRITE_REVIEW_NODE", [REVIEW, REVIEW_RESULT, NEXT_STEPS])
WRITE_MOVE_NODE = ActionNode.from_children("WRITE_MOVE_NODE", [WRITE_DRAFT, WRITE_FUNCTION])
CR_FOR_MOVE_FUNCTION_BY_3 = """
@ -579,8 +578,7 @@ class WriteCodeAN(Action):
async def run(self, context):
self.llm.system_prompt = "You are an outstanding engineer and can implement any code"
return await WRITE_MOVE_FUNCTION.fill(context=context, llm=self.llm, schema="json")
# return await WRITE_CODE_NODE.fill(context=context, llm=self.llm, schema="markdown")
return await WRITE_MOVE_NODE.fill(context=context, llm=self.llm, schema="json")
async def main():

View file

@ -143,8 +143,7 @@ class WriteCodeReview(Action):
code_context = await WriteCode.get_codes(
self.i_context.task_doc,
exclude=self.i_context.filename,
git_repo=self.context.git_repo,
src_workspace=self.src_workspace,
project_repo=self.project_repo.with_src_path(self.context.src_workspace),
)
context = "\n".join(
[

View file

@ -29,9 +29,6 @@ from metagpt.actions.write_prd_an import (
from metagpt.const import (
BUGFIX_FILENAME,
COMPETITIVE_ANALYSIS_FILE_REPO,
DOCS_FILE_REPO,
PRD_PDF_FILE_REPO,
PRDS_FILE_REPO,
REQUIREMENT_FILENAME,
)
from metagpt.logs import logger
@ -67,11 +64,10 @@ class WritePRD(Action):
async def run(self, with_messages, *args, **kwargs) -> ActionOutput | Message:
# Determine which requirement documents need to be rewritten: Use LLM to assess whether new requirements are
# related to the PRD. If they are related, rewrite the PRD.
docs_file_repo = self.git_repo.new_file_repository(relative_path=DOCS_FILE_REPO)
requirement_doc = await docs_file_repo.get(filename=REQUIREMENT_FILENAME)
requirement_doc = await self.project_repo.docs.get(filename=REQUIREMENT_FILENAME)
if requirement_doc and await self._is_bugfix(requirement_doc.content):
await docs_file_repo.save(filename=BUGFIX_FILENAME, content=requirement_doc.content)
await docs_file_repo.save(filename=REQUIREMENT_FILENAME, content="")
await self.project_repo.docs.save(filename=BUGFIX_FILENAME, content=requirement_doc.content)
await self.project_repo.docs.save(filename=REQUIREMENT_FILENAME, content="")
bug_fix = BugFixContext(filename=BUGFIX_FILENAME)
return Message(
content=bug_fix.model_dump_json(),
@ -82,24 +78,19 @@ class WritePRD(Action):
send_to="Alex", # the name of Engineer
)
else:
await docs_file_repo.delete(filename=BUGFIX_FILENAME)
await self.project_repo.docs.delete(filename=BUGFIX_FILENAME)
prds_file_repo = self.git_repo.new_file_repository(PRDS_FILE_REPO)
prd_docs = await prds_file_repo.get_all()
prd_docs = await self.project_repo.docs.prd.get_all()
change_files = Documents()
for prd_doc in prd_docs:
prd_doc = await self._update_prd(
requirement_doc=requirement_doc, prd_doc=prd_doc, prds_file_repo=prds_file_repo, *args, **kwargs
)
prd_doc = await self._update_prd(requirement_doc=requirement_doc, prd_doc=prd_doc, *args, **kwargs)
if not prd_doc:
continue
change_files.docs[prd_doc.filename] = prd_doc
logger.info(f"rewrite prd: {prd_doc.filename}")
# If there is no existing PRD, generate one using 'docs/requirement.txt'.
if not change_files.docs:
prd_doc = await self._update_prd(
requirement_doc=requirement_doc, prd_doc=None, prds_file_repo=prds_file_repo, *args, **kwargs
)
prd_doc = await self._update_prd(requirement_doc=requirement_doc, *args, **kwargs)
if prd_doc:
change_files.docs[prd_doc.filename] = prd_doc
logger.debug(f"new prd: {prd_doc.filename}")
@ -109,13 +100,6 @@ class WritePRD(Action):
return ActionOutput(content=change_files.model_dump_json(), instruct_content=change_files)
async def _run_new_requirement(self, requirements) -> ActionOutput:
# sas = SearchAndSummarize()
# # rsp = await sas.run(context=requirements, system_text=SEARCH_AND_SUMMARIZE_SYSTEM_EN_US)
# rsp = ""
# info = f"### Search Results\n{sas.result}\n\n### Search Summary\n{rsp}"
# if sas.result:
# logger.info(sas.result)
# logger.info(rsp)
project_name = self.project_name
context = CONTEXT_TEMPLATE.format(requirements=requirements, project_name=project_name)
exclude = [PROJECT_NAME.key] if project_name else []
@ -137,23 +121,21 @@ class WritePRD(Action):
await self._rename_workspace(node)
return prd_doc
async def _update_prd(self, requirement_doc, prd_doc, prds_file_repo, *args, **kwargs) -> Document | None:
async def _update_prd(self, requirement_doc, prd_doc=None, *args, **kwargs) -> Document | None:
if not prd_doc:
prd = await self._run_new_requirement(
requirements=[requirement_doc.content if requirement_doc else ""], *args, **kwargs
)
new_prd_doc = Document(
root_path=PRDS_FILE_REPO,
filename=FileRepository.new_filename() + ".json",
content=prd.instruct_content.model_dump_json(),
new_prd_doc = await self.project_repo.docs.prd.save(
filename=FileRepository.new_filename() + ".json", content=prd.instruct_content.model_dump_json()
)
elif await self._is_relative(requirement_doc, prd_doc):
new_prd_doc = await self._merge(requirement_doc, prd_doc)
self.project_repo.docs.prd.save_doc(doc=new_prd_doc)
else:
return None
await prds_file_repo.save(filename=new_prd_doc.filename, content=new_prd_doc.content)
await self._save_competitive_analysis(new_prd_doc)
await self._save_pdf(new_prd_doc)
await self.project_repo.resources.prd.save_pdf(doc=new_prd_doc)
return new_prd_doc
async def _save_competitive_analysis(self, prd_doc):
@ -161,14 +143,13 @@ class WritePRD(Action):
quadrant_chart = m.get("Competitive Quadrant Chart")
if not quadrant_chart:
return
pathname = self.git_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix("")
pathname = (
self.project_repo.workdir / Path(COMPETITIVE_ANALYSIS_FILE_REPO) / Path(prd_doc.filename).with_suffix("")
)
if not pathname.parent.exists():
pathname.parent.mkdir(parents=True, exist_ok=True)
await mermaid_to_file(self.config.mermaid_engine, quadrant_chart, pathname)
async def _save_pdf(self, prd_doc):
await self.file_repo.save_as(doc=prd_doc, with_suffix=".md", relative_path=PRD_PDF_FILE_REPO)
async def _rename_workspace(self, prd):
if not self.project_name:
if isinstance(prd, (ActionOutput, ActionNode)):
@ -177,11 +158,14 @@ class WritePRD(Action):
ws_name = CodeParser.parse_str(block="Project Name", text=prd)
if ws_name:
self.project_name = ws_name
self.git_repo.rename_root(self.project_name)
self.project_repo.git_repo.rename_root(self.project_name)
async def _is_bugfix(self, context) -> bool:
src_workspace_path = self.git_repo.workdir / self.git_repo.workdir.name
code_files = self.git_repo.get_files(relative_path=src_workspace_path)
git_workdir = self.project_repo.git_repo.workdir
src_workdir = git_workdir / git_workdir.name
if not src_workdir.exists():
return False
code_files = self.project_repo.with_src_path(path=git_workdir / git_workdir.name).srcs.all_files
if not code_files:
return False
node = await WP_ISSUE_TYPE_NODE.fill(context, self.llm)

View file

@ -8,7 +8,7 @@
from typing import Optional
from metagpt.actions import Action
from metagpt.context import CONTEXT
from metagpt.context import Context
from metagpt.logs import logger
@ -24,7 +24,7 @@ class WriteTeachingPlanPart(Action):
statement_patterns = TeachingPlanBlock.TOPIC_STATEMENTS.get(self.topic, [])
statements = []
for p in statement_patterns:
s = self.format_value(p)
s = self.format_value(p, context=self.context)
statements.append(s)
formatter = (
TeachingPlanBlock.PROMPT_TITLE_TEMPLATE
@ -68,21 +68,23 @@ class WriteTeachingPlanPart(Action):
return self.topic
@staticmethod
def format_value(value):
def format_value(value, context: Context):
"""Fill parameters inside `value` with `options`."""
if not isinstance(value, str):
return value
if "{" not in value:
return value
# FIXME: 从Context中获取参数而非从options
merged_opts = CONTEXT.options or {}
options = context.config.model_dump()
for k, v in context.kwargs:
options[k] = v # None value is allowed to override and disable the value from config.
opts = {k: v for k, v in options.items() if v is not None}
try:
return value.format(**merged_opts)
return value.format(**opts)
except KeyError as e:
logger.warning(f"Parameter is missing:{e}")
for k, v in merged_opts.items():
for k, v in opts.items():
value = value.replace("{" + f"{k}" + "}", str(v))
return value

View file

@ -84,7 +84,7 @@ class Config(CLIParams, YamlModel):
@classmethod
def from_home(cls, path):
"""Load config from ~/.metagpt/config.yaml"""
return Config.model_validate_yaml(CONFIG_ROOT / path)
return Config.from_yaml_file(CONFIG_ROOT / path)
@classmethod
def default(cls):

View file

@ -9,7 +9,6 @@
@Modified By: mashenquan, 2023-11-27. Defines file repository paths according to Section 2.2.3.4 of RFC 135.
@Modified By: mashenquan, 2023/12/5. Add directories for code summarization..
"""
import contextvars
import os
from pathlib import Path
@ -17,8 +16,6 @@ from loguru import logger
import metagpt
OPTIONS = contextvars.ContextVar("OPTIONS", default={})
def get_metagpt_package_root():
"""Get the root directory of the installed package."""
@ -71,12 +68,10 @@ SOURCE_ROOT = METAGPT_ROOT / "metagpt"
PROMPT_PATH = SOURCE_ROOT / "prompts"
SKILL_DIRECTORY = SOURCE_ROOT / "skills"
# REAL CONSTS
MEM_TTL = 24 * 30 * 3600
MESSAGE_ROUTE_FROM = "sent_from"
MESSAGE_ROUTE_TO = "send_to"
MESSAGE_ROUTE_CAUSE_BY = "cause_by"
@ -89,23 +84,23 @@ BUGFIX_FILENAME = "bugfix.txt"
PACKAGE_REQUIREMENTS_FILENAME = "requirements.txt"
DOCS_FILE_REPO = "docs"
PRDS_FILE_REPO = "docs/prds"
PRDS_FILE_REPO = "docs/prd"
SYSTEM_DESIGN_FILE_REPO = "docs/system_design"
TASK_FILE_REPO = "docs/tasks"
TASK_FILE_REPO = "docs/task"
COMPETITIVE_ANALYSIS_FILE_REPO = "resources/competitive_analysis"
DATA_API_DESIGN_FILE_REPO = "resources/data_api_design"
SEQ_FLOW_FILE_REPO = "resources/seq_flow"
SYSTEM_DESIGN_PDF_FILE_REPO = "resources/system_design"
PRD_PDF_FILE_REPO = "resources/prd"
TASK_PDF_FILE_REPO = "resources/api_spec_and_tasks"
TASK_PDF_FILE_REPO = "resources/api_spec_and_task"
TEST_CODES_FILE_REPO = "tests"
TEST_OUTPUTS_FILE_REPO = "test_outputs"
CODE_SUMMARIES_FILE_REPO = "docs/code_summaries"
CODE_SUMMARIES_PDF_FILE_REPO = "resources/code_summaries"
CODE_SUMMARIES_FILE_REPO = "docs/code_summary"
CODE_SUMMARIES_PDF_FILE_REPO = "resources/code_summary"
RESOURCES_FILE_REPO = "resources"
SD_OUTPUT_FILE_REPO = "resources/SD_Output"
SD_OUTPUT_FILE_REPO = "resources/sd_output"
GRAPH_REPO_FILE_REPO = "docs/graph_repo"
CLASS_VIEW_FILE_REPO = "docs/class_views"
CLASS_VIEW_FILE_REPO = "docs/class_view"
YAPI_URL = "http://yapi.deepwisdomai.com/"

View file

@ -7,13 +7,12 @@
"""
import os
from pathlib import Path
from typing import Optional
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict
from metagpt.config2 import Config
from metagpt.configs.llm_config import LLMConfig
from metagpt.const import OPTIONS
from metagpt.provider.base_llm import BaseLLM
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
@ -41,6 +40,16 @@ class AttrDict(BaseModel):
else:
raise AttributeError(f"No such attribute: {key}")
def set(self, key, val: Any):
self.__dict__[key] = val
def get(self, key, default: Any = None):
return self.__dict__.get(key, default)
def remove(self, key):
if key in self.__dict__:
self.__delattr__(key)
class Context(BaseModel):
"""Env context for MetaGPT"""
@ -55,15 +64,6 @@ class Context(BaseModel):
_llm: Optional[BaseLLM] = None
@property
def file_repo(self):
return self.git_repo.new_file_repository()
@property
def options(self):
"""Return all key-values"""
return OPTIONS.get()
def new_environ(self):
"""Return a new os.environ object"""
env = os.environ.copy()

View file

@ -6,16 +6,19 @@
@File : text_to_embedding.py
@Desc : Text-to-Embedding skill, which provides text-to-embedding functionality.
"""
import metagpt.config2
from metagpt.config2 import Config
from metagpt.tools.openai_text_to_embedding import oas3_openai_text_to_embedding
async def text_to_embedding(text, model="text-embedding-ada-002", openai_api_key="", **kwargs):
async def text_to_embedding(text, model="text-embedding-ada-002", config: Config = metagpt.config2.config):
"""Text to embedding
:param text: The text used for embedding.
:param model: One of ['text-embedding-ada-002'], ID of the model to use. For more details, checkout: `https://api.openai.com/v1/models`.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param config: OpenAI config with API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
return await oas3_openai_text_to_embedding(text, model=model, openai_api_key=openai_api_key)
openai_api_key = config.get_openai_llm().api_key
proxy = config.get_openai_llm().proxy
return await oas3_openai_text_to_embedding(text, model=model, openai_api_key=openai_api_key, proxy=proxy)

View file

@ -8,6 +8,7 @@
"""
import base64
import metagpt.config2
from metagpt.config2 import Config
from metagpt.const import BASE64_FORMAT
from metagpt.llm import LLM
@ -16,27 +17,26 @@ from metagpt.tools.openai_text_to_image import oas3_openai_text_to_image
from metagpt.utils.s3 import S3
async def text_to_image(text, size_type: str = "512x512", model_url="", config: Config = None):
async def text_to_image(text, size_type: str = "512x512", config: Config = metagpt.config2.config):
"""Text to image
:param text: The text used for image conversion.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param size_type: If using OPENAI, the available size options are ['256x256', '512x512', '1024x1024'], while for MetaGPT, the options are ['512x512', '512x768'].
:param model_url: MetaGPT model url
:param config: Config
:return: The image data is returned in Base64 encoding.
"""
image_declaration = "data:image/png;base64,"
model_url = config.METAGPT_TEXT_TO_IMAGE_MODEL_URL
if model_url:
binary_data = await oas3_metagpt_text_to_image(text, size_type, model_url)
elif oai_llm := config.get_openai_llm():
binary_data = await oas3_openai_text_to_image(text, size_type, LLM(oai_llm))
elif config.get_openai_llm():
llm = LLM(llm_config=config.get_openai_llm())
binary_data = await oas3_openai_text_to_image(text, size_type, llm=llm)
else:
raise ValueError("Missing necessary parameters.")
base64_data = base64.b64encode(binary_data).decode("utf-8")
assert config.s3, "S3 config is required."
s3 = S3(config.s3)
url = await s3.cache(data=base64_data, file_ext=".png", format=BASE64_FORMAT)
if url:

View file

@ -6,8 +6,8 @@
@File : text_to_speech.py
@Desc : Text-to-Speech skill, which provides text-to-speech functionality
"""
from metagpt.config2 import config
import metagpt.config2
from metagpt.config2 import Config
from metagpt.const import BASE64_FORMAT
from metagpt.tools.azure_tts import oas3_azsure_tts
from metagpt.tools.iflytek_tts import oas3_iflytek_tts
@ -20,12 +20,7 @@ async def text_to_speech(
voice="zh-CN-XiaomoNeural",
style="affectionate",
role="Girl",
subscription_key="",
region="",
iflytek_app_id="",
iflytek_api_key="",
iflytek_api_secret="",
**kwargs,
config: Config = metagpt.config2.config,
):
"""Text to speech
For more details, check out:`https://learn.microsoft.com/en-us/azure/ai-services/speech-service/language-support?tabs=tts`
@ -44,6 +39,8 @@ async def text_to_speech(
"""
subscription_key = config.AZURE_TTS_SUBSCRIPTION_KEY
region = config.AZURE_TTS_REGION
if subscription_key and region:
audio_declaration = "data:audio/wav;base64,"
base64_data = await oas3_azsure_tts(text, lang, voice, style, role, subscription_key, region)
@ -52,6 +49,10 @@ async def text_to_speech(
if url:
return f"[{text}]({url})"
return audio_declaration + base64_data if base64_data else base64_data
iflytek_app_id = config.IFLYTEK_APP_ID
iflytek_api_key = config.IFLYTEK_API_KEY
iflytek_api_secret = config.IFLYTEK_API_SECRET
if iflytek_app_id and iflytek_api_key and iflytek_api_secret:
audio_declaration = "data:audio/mp3;base64,"
base64_data = await oas3_iflytek_tts(

View file

@ -223,7 +223,7 @@ class OpenAILLM(BaseLLM):
def get_costs(self) -> Costs:
if not self.cost_manager:
return Costs()
return Costs(0, 0, 0, 0)
return self.cost_manager.get_costs()
def _get_max_tokens(self, messages: list[dict]):

View file

@ -65,7 +65,7 @@ class Assistant(Role):
prompt += f"If the text explicitly want you to {desc}, return `[SKILL]: {name}` brief and clear. For instance: [SKILL]: {name}\n"
prompt += 'Otherwise, return `[TALK]: {talk}` brief and clear. For instance: if {talk} is "xxxx" return [TALK]: xxxx\n\n'
prompt += f"Now what specific action is explicitly mentioned in the text: {last_talk}\n"
rsp = await self.llm.aask(prompt, [])
rsp = await self.llm.aask(prompt, ["You are an action classifier"])
logger.info(f"THINK: {prompt}\n, THINK RESULT: {rsp}\n")
return await self._plan(rsp, last_talk=last_talk)
@ -98,9 +98,7 @@ class Assistant(Role):
history = self.memory.history_text
text = kwargs.get("last_talk") or text
self.set_todo(
TalkAction(
context=text, knowledge=self.memory.get_knowledge(), history_summary=history, llm=self.llm, **kwargs
)
TalkAction(i_context=text, knowledge=self.memory.get_knowledge(), history_summary=history, llm=self.llm)
)
return True
@ -110,7 +108,7 @@ class Assistant(Role):
if not skill:
logger.info(f"skill not found: {text}")
return await self.talk_handler(text=last_talk, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self.llm, ask=last_talk, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self.llm, ask=last_talk)
await action.run(**kwargs)
if action.args is None:
return await self.talk_handler(text=last_talk, **kwargs)

View file

@ -27,12 +27,7 @@ from typing import Set
from metagpt.actions import Action, WriteCode, WriteCodeReview, WriteTasks
from metagpt.actions.fix_bug import FixBug
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.const import (
CODE_SUMMARIES_FILE_REPO,
CODE_SUMMARIES_PDF_FILE_REPO,
SYSTEM_DESIGN_FILE_REPO,
TASK_FILE_REPO,
)
from metagpt.const import SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import (
@ -97,7 +92,6 @@ class Engineer(Role):
async def _act_sp_with_cr(self, review=False) -> Set[str]:
changed_files = set()
src_file_repo = self.git_repo.new_file_repository(self.src_workspace)
for todo in self.code_todos:
"""
# Select essential information from the historical data to reduce the length of the prompt (summarized from human experience):
@ -112,8 +106,8 @@ class Engineer(Role):
action = WriteCodeReview(i_context=coding_context, context=self.context, llm=self.llm)
self._init_action(action)
coding_context = await action.run()
await src_file_repo.save(
coding_context.filename,
await self.project_repo.srcs.save(
filename=coding_context.filename,
dependencies={coding_context.design_doc.root_relative_path, coding_context.task_doc.root_relative_path},
content=coding_context.code_doc.content,
)
@ -153,31 +147,29 @@ class Engineer(Role):
)
async def _act_summarize(self):
code_summaries_file_repo = self.git_repo.new_file_repository(CODE_SUMMARIES_FILE_REPO)
code_summaries_pdf_file_repo = self.git_repo.new_file_repository(CODE_SUMMARIES_PDF_FILE_REPO)
tasks = []
src_relative_path = self.src_workspace.relative_to(self.git_repo.workdir)
for todo in self.summarize_todos:
summary = await todo.run()
summary_filename = Path(todo.i_context.design_filename).with_suffix(".md").name
dependencies = {todo.i_context.design_filename, todo.i_context.task_filename}
for filename in todo.i_context.codes_filenames:
rpath = src_relative_path / filename
rpath = self.project_repo.src_relative_path / filename
dependencies.add(str(rpath))
await code_summaries_pdf_file_repo.save(
await self.project_repo.resources.code_summary.save(
filename=summary_filename, content=summary, dependencies=dependencies
)
is_pass, reason = await self._is_pass(summary)
if not is_pass:
todo.i_context.reason = reason
tasks.append(todo.i_context.dict())
await code_summaries_file_repo.save(
await self.project_repo.docs.code_summary.save(
filename=Path(todo.i_context.design_filename).name,
content=todo.i_context.model_dump_json(),
dependencies=dependencies,
)
else:
await code_summaries_file_repo.delete(filename=Path(todo.i_context.design_filename).name)
await self.project_repo.docs.code_summary.delete(filename=Path(todo.i_context.design_filename).name)
logger.info(f"--max-auto-summarize-code={self.config.max_auto_summarize_code}")
if not tasks or self.config.max_auto_summarize_code == 0:
@ -220,60 +212,54 @@ class Engineer(Role):
return self.rc.todo
return None
@staticmethod
async def _new_coding_context(
filename, src_file_repo, task_file_repo, design_file_repo, dependency
) -> CodingContext:
old_code_doc = await src_file_repo.get(filename)
async def _new_coding_context(self, filename, dependency) -> CodingContext:
old_code_doc = await self.project_repo.srcs.get(filename)
if not old_code_doc:
old_code_doc = Document(root_path=str(src_file_repo.root_path), filename=filename, content="")
old_code_doc = Document(root_path=str(self.project_repo.src_relative_path), filename=filename, content="")
dependencies = {Path(i) for i in await dependency.get(old_code_doc.root_relative_path)}
task_doc = None
design_doc = None
for i in dependencies:
if str(i.parent) == TASK_FILE_REPO:
task_doc = await task_file_repo.get(i.name)
task_doc = await self.project_repo.docs.task.get(i.name)
elif str(i.parent) == SYSTEM_DESIGN_FILE_REPO:
design_doc = await design_file_repo.get(i.name)
design_doc = await self.project_repo.docs.system_design.get(i.name)
if not task_doc or not design_doc:
logger.error(f'Detected source code "{filename}" from an unknown origin.')
raise ValueError(f'Detected source code "{filename}" from an unknown origin.')
context = CodingContext(filename=filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc)
return context
@staticmethod
async def _new_coding_doc(filename, src_file_repo, task_file_repo, design_file_repo, dependency):
context = await Engineer._new_coding_context(
filename, src_file_repo, task_file_repo, design_file_repo, dependency
)
async def _new_coding_doc(self, filename, dependency):
context = await self._new_coding_context(filename, dependency)
coding_doc = Document(
root_path=str(src_file_repo.root_path), filename=filename, content=context.model_dump_json()
root_path=str(self.project_repo.src_relative_path), filename=filename, content=context.model_dump_json()
)
return coding_doc
async def _new_code_actions(self, bug_fix=False):
# Prepare file repos
src_file_repo = self.git_repo.new_file_repository(self.src_workspace)
changed_src_files = src_file_repo.all_files if bug_fix else src_file_repo.changed_files
task_file_repo = self.git_repo.new_file_repository(TASK_FILE_REPO)
changed_task_files = task_file_repo.changed_files
design_file_repo = self.git_repo.new_file_repository(SYSTEM_DESIGN_FILE_REPO)
changed_src_files = self.project_repo.srcs.all_files if bug_fix else self.project_repo.srcs.changed_files
changed_task_files = self.project_repo.docs.task.changed_files
changed_files = Documents()
# Recode caused by upstream changes.
for filename in changed_task_files:
design_doc = await design_file_repo.get(filename)
task_doc = await task_file_repo.get(filename)
design_doc = await self.project_repo.docs.system_design.get(filename)
task_doc = await self.project_repo.docs.task.get(filename)
task_list = self._parse_tasks(task_doc)
for task_filename in task_list:
old_code_doc = await src_file_repo.get(task_filename)
old_code_doc = await self.project_repo.srcs.get(task_filename)
if not old_code_doc:
old_code_doc = Document(root_path=str(src_file_repo.root_path), filename=task_filename, content="")
old_code_doc = Document(
root_path=str(self.project_repo.src_relative_path), filename=task_filename, content=""
)
context = CodingContext(
filename=task_filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc
)
coding_doc = Document(
root_path=str(src_file_repo.root_path), filename=task_filename, content=context.model_dump_json()
root_path=str(self.project_repo.src_relative_path),
filename=task_filename,
content=context.model_dump_json(),
)
if task_filename in changed_files.docs:
logger.warning(
@ -289,13 +275,7 @@ class Engineer(Role):
for filename in changed_src_files:
if filename in changed_files.docs:
continue
coding_doc = await self._new_coding_doc(
filename=filename,
src_file_repo=src_file_repo,
task_file_repo=task_file_repo,
design_file_repo=design_file_repo,
dependency=dependency,
)
coding_doc = await self._new_coding_doc(filename=filename, dependency=dependency)
changed_files.docs[filename] = coding_doc
self.code_todos.append(WriteCode(i_context=coding_doc, context=self.context, llm=self.llm))
@ -303,13 +283,12 @@ class Engineer(Role):
self.set_todo(self.code_todos[0])
async def _new_summarize_actions(self):
src_file_repo = self.git_repo.new_file_repository(self.src_workspace)
src_files = src_file_repo.all_files
src_files = self.project_repo.srcs.all_files
# Generate a SummarizeCode action for each pair of (system_design_doc, task_doc).
summarizations = defaultdict(list)
for filename in src_files:
dependencies = await src_file_repo.get_dependency(filename=filename)
ctx = CodeSummarizeContext.loads(filenames=dependencies)
dependencies = await self.project_repo.srcs.get_dependency(filename=filename)
ctx = CodeSummarizeContext.loads(filenames=list(dependencies))
summarizations[ctx].append(filename)
for ctx, filenames in summarizations.items():
ctx.codes_filenames = filenames

View file

@ -17,11 +17,7 @@
from metagpt.actions import DebugError, RunCode, WriteTest
from metagpt.actions.summarize_code import SummarizeCode
from metagpt.const import (
MESSAGE_ROUTE_TO_NONE,
TEST_CODES_FILE_REPO,
TEST_OUTPUTS_FILE_REPO,
)
from metagpt.const import MESSAGE_ROUTE_TO_NONE
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Document, Message, RunCodeContext, TestingContext
@ -48,29 +44,26 @@ class QaEngineer(Role):
self.test_round = 0
async def _write_test(self, message: Message) -> None:
src_file_repo = self.context.git_repo.new_file_repository(self.context.src_workspace)
src_file_repo = self.project_repo.with_src_path(self.context.src_workspace).srcs
changed_files = set(src_file_repo.changed_files.keys())
# Unit tests only.
if self.config.reqa_file and self.config.reqa_file not in changed_files:
changed_files.add(self.config.reqa_file)
tests_file_repo = self.context.git_repo.new_file_repository(TEST_CODES_FILE_REPO)
for filename in changed_files:
# write tests
if not filename or "test" in filename:
continue
code_doc = await src_file_repo.get(filename)
test_doc = await tests_file_repo.get("test_" + code_doc.filename)
test_doc = await self.project_repo.tests.get("test_" + code_doc.filename)
if not test_doc:
test_doc = Document(
root_path=str(tests_file_repo.root_path), filename="test_" + code_doc.filename, content=""
root_path=str(self.project_repo.tests.root_path), filename="test_" + code_doc.filename, content=""
)
logger.info(f"Writing {test_doc.filename}..")
context = TestingContext(filename=test_doc.filename, test_doc=test_doc, code_doc=code_doc)
context = await WriteTest(i_context=context, context=self.context, llm=self.llm).run()
await tests_file_repo.save(
filename=context.test_doc.filename,
content=context.test_doc.content,
dependencies={context.code_doc.root_relative_path},
await self.project_repo.tests.save_doc(
doc=context.test_doc, dependencies={context.code_doc.root_relative_path}
)
# prepare context for run tests in next round
@ -78,7 +71,7 @@ class QaEngineer(Role):
command=["python", context.test_doc.root_relative_path],
code_filename=context.code_doc.filename,
test_filename=context.test_doc.filename,
working_directory=str(self.context.git_repo.workdir),
working_directory=str(self.project_repo.workdir),
additional_python_paths=[str(self.context.src_workspace)],
)
self.publish_message(
@ -91,25 +84,23 @@ class QaEngineer(Role):
)
)
logger.info(f"Done {str(tests_file_repo.workdir)} generating.")
logger.info(f"Done {str(self.project_repo.tests.workdir)} generating.")
async def _run_code(self, msg):
run_code_context = RunCodeContext.loads(msg.content)
src_doc = await self.context.git_repo.new_file_repository(self.context.src_workspace).get(
src_doc = await self.project_repo.with_src_path(self.context.src_workspace).srcs.get(
run_code_context.code_filename
)
if not src_doc:
return
test_doc = await self.context.git_repo.new_file_repository(TEST_CODES_FILE_REPO).get(
run_code_context.test_filename
)
test_doc = await self.project_repo.tests.get(run_code_context.test_filename)
if not test_doc:
return
run_code_context.code = src_doc.content
run_code_context.test_code = test_doc.content
result = await RunCode(i_context=run_code_context, context=self.context, llm=self.llm).run()
run_code_context.output_filename = run_code_context.test_filename + ".json"
await self.context.git_repo.new_file_repository(TEST_OUTPUTS_FILE_REPO).save(
await self.project_repo.test_outputs.save(
filename=run_code_context.output_filename,
content=result.model_dump_json(),
dependencies={src_doc.root_relative_path, test_doc.root_relative_path},
@ -132,9 +123,7 @@ class QaEngineer(Role):
async def _debug_error(self, msg):
run_code_context = RunCodeContext.loads(msg.content)
code = await DebugError(i_context=run_code_context, context=self.context, llm=self.llm).run()
await self.context.file_repo.save_file(
filename=run_code_context.test_filename, content=code, relative_path=TEST_CODES_FILE_REPO
)
await self.project_repo.tests.save(filename=run_code_context.test_filename, content=code)
run_code_context.output = None
self.publish_message(
Message(

View file

@ -36,6 +36,7 @@ from metagpt.memory import Memory
from metagpt.provider import HumanProvider
from metagpt.schema import Message, MessageQueue, SerializationMixin
from metagpt.utils.common import any_to_name, any_to_str, role_raise_decorator
from metagpt.utils.project_repo import ProjectRepo
from metagpt.utils.repair_llm_raw_output import extract_state_value_from_output
PREFIX_TEMPLATE = """You are a {profile}, named {name}, your goal is {goal}. """
@ -199,6 +200,11 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
def src_workspace(self, value):
self.context.src_workspace = value
@property
def project_repo(self) -> ProjectRepo:
project_repo = ProjectRepo(self.context.git_repo)
return project_repo.with_src_path(self.context.src_workspace) if self.context.src_workspace else project_repo
@property
def prompt_schema(self):
"""Prompt schema: json/markdown"""
@ -449,7 +455,7 @@ class Role(SerializationMixin, ContextMixin, BaseModel):
break
# act
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act() # 这个rsp是否需要publish_message
rsp = await self._act()
actions_taken += 1
return rsp # return output from the last action

View file

@ -31,11 +31,11 @@ class Teacher(Role):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = WriteTeachingPlanPart.format_value(self.name)
self.profile = WriteTeachingPlanPart.format_value(self.profile)
self.goal = WriteTeachingPlanPart.format_value(self.goal)
self.constraints = WriteTeachingPlanPart.format_value(self.constraints)
self.desc = WriteTeachingPlanPart.format_value(self.desc)
self.name = WriteTeachingPlanPart.format_value(self.name, self.context)
self.profile = WriteTeachingPlanPart.format_value(self.profile, self.context)
self.goal = WriteTeachingPlanPart.format_value(self.goal, self.context)
self.constraints = WriteTeachingPlanPart.format_value(self.constraints, self.context)
self.desc = WriteTeachingPlanPart.format_value(self.desc, self.context)
async def _think(self) -> bool:
"""Everything will be done part by part."""

View file

@ -13,7 +13,6 @@ import aiohttp
import requests
from pydantic import BaseModel, Field
from metagpt.config2 import config
from metagpt.logs import logger
@ -43,12 +42,12 @@ class ResultEmbedding(BaseModel):
class OpenAIText2Embedding:
def __init__(self, openai_api_key):
def __init__(self, api_key: str, proxy: str):
"""
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
"""
self.openai_llm = config.get_openai_llm()
self.openai_api_key = openai_api_key or self.openai_llm.api_key
self.api_key = api_key
self.proxy = proxy
async def text_2_embedding(self, text, model="text-embedding-ada-002"):
"""Text to embedding
@ -58,8 +57,8 @@ class OpenAIText2Embedding:
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
proxies = {"proxy": self.openai_llm.proxy} if self.openai_llm.proxy else {}
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.openai_api_key}"}
proxies = {"proxy": self.proxy} if self.proxy else {}
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
data = {"input": text, "model": model}
url = "https://api.openai.com/v1/embeddings"
try:
@ -73,16 +72,14 @@ class OpenAIText2Embedding:
# Export
async def oas3_openai_text_to_embedding(text, model="text-embedding-ada-002", openai_api_key=""):
async def oas3_openai_text_to_embedding(text, openai_api_key: str, model="text-embedding-ada-002", proxy: str = ""):
"""Text to embedding
:param text: The text used for embedding.
:param model: One of ['text-embedding-ada-002'], ID of the model to use. For more details, checkout: `https://api.openai.com/v1/models`.
:param openai_api_key: OpenAI API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:param config: OpenAI config with API key, For more details, checkout: `https://platform.openai.com/account/api-keys`
:return: A json object of :class:`ResultEmbedding` class if successful, otherwise `{}`.
"""
if not text:
return ""
if not openai_api_key:
openai_api_key = config.get_openai_llm().api_key
return await OpenAIText2Embedding(openai_api_key).text_2_embedding(text, model=model)
return await OpenAIText2Embedding(api_key=openai_api_key, proxy=proxy).text_2_embedding(text, model=model)

View file

@ -44,19 +44,20 @@ class SearchEngine:
self,
engine: Optional[SearchEngineType] = SearchEngineType.SERPER_GOOGLE,
run_func: Callable[[str, int, bool], Coroutine[None, None, Union[str, list[str]]]] = None,
**kwargs,
):
if engine == SearchEngineType.SERPAPI_GOOGLE:
module = "metagpt.tools.search_engine_serpapi"
run_func = importlib.import_module(module).SerpAPIWrapper().run
run_func = importlib.import_module(module).SerpAPIWrapper(**kwargs).run
elif engine == SearchEngineType.SERPER_GOOGLE:
module = "metagpt.tools.search_engine_serper"
run_func = importlib.import_module(module).SerperWrapper().run
run_func = importlib.import_module(module).SerperWrapper(**kwargs).run
elif engine == SearchEngineType.DIRECT_GOOGLE:
module = "metagpt.tools.search_engine_googleapi"
run_func = importlib.import_module(module).GoogleAPIWrapper().run
run_func = importlib.import_module(module).GoogleAPIWrapper(**kwargs).run
elif engine == SearchEngineType.DUCK_DUCK_GO:
module = "metagpt.tools.search_engine_ddg"
run_func = importlib.import_module(module).DDGAPIWrapper().run
run_func = importlib.import_module(module).DDGAPIWrapper(**kwargs).run
elif engine == SearchEngineType.CUSTOM_ENGINE:
pass # run_func = run_func
else:

View file

@ -13,7 +13,7 @@ from metagpt.utils.parse_html import WebPage
class WebBrowserEngine:
def __init__(
self,
engine: WebBrowserEngineType | None = WebBrowserEngineType.PLAYWRIGHT,
engine: WebBrowserEngineType = WebBrowserEngineType.PLAYWRIGHT,
run_func: Callable[..., Coroutine[Any, Any, WebPage | list[WebPage]]] | None = None,
):
if engine is None:

View file

@ -33,7 +33,7 @@ class SeleniumWrapper:
def __init__(
self,
browser_type: Literal["chrome", "firefox", "edge", "ie"] | None = None,
browser_type: Literal["chrome", "firefox", "edge", "ie"] = "chrome",
launch_kwargs: dict | None = None,
*,
loop: asyncio.AbstractEventLoop | None = None,

View file

@ -45,7 +45,7 @@ class FileRepository:
# Initializing
self.workdir.mkdir(parents=True, exist_ok=True)
async def save(self, filename: Path | str, content, dependencies: List[str] = None):
async def save(self, filename: Path | str, content, dependencies: List[str] = None) -> Document:
"""Save content to a file and update its dependencies.
:param filename: The filename or path within the repository.
@ -63,6 +63,8 @@ class FileRepository:
await dependency_file.update(pathname, set(dependencies))
logger.info(f"update dependency: {str(pathname)}:{dependencies}")
return Document(root_path=str(self._relative_path), filename=filename, content=content)
async def get_dependency(self, filename: Path | str) -> Set[str]:
"""Get the dependencies of a file.
@ -181,10 +183,20 @@ class FileRepository:
"""
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
return current_time
# guid_suffix = str(uuid.uuid4())[:8]
# return f"{current_time}x{guid_suffix}"
async def save_doc(self, doc: Document, with_suffix: str = None, dependencies: List[str] = None):
async def save_doc(self, doc: Document, dependencies: List[str] = None):
"""Save content to a file and update its dependencies.
:param doc: The Document instance to be saved.
:type doc: Document
:param dependencies: A list of dependencies for the saved file.
:type dependencies: List[str], optional
"""
await self.save(filename=doc.filename, content=doc.content, dependencies=dependencies)
logger.debug(f"File Saved: {str(doc.filename)}")
async def save_pdf(self, doc: Document, with_suffix: str = ".md", dependencies: List[str] = None):
"""Save a Document instance as a PDF file.
This method converts the content of the Document instance to Markdown,
@ -202,68 +214,6 @@ class FileRepository:
await self.save(filename=str(filename), content=json_to_markdown(m), dependencies=dependencies)
logger.debug(f"File Saved: {str(filename)}")
async def get_file(self, filename: Path | str, relative_path: Path | str = ".") -> Document | None:
"""Retrieve a specific file from the file repository.
:param filename: The name or path of the file to retrieve.
:type filename: Path or str
:param relative_path: The relative path within the file repository.
:type relative_path: Path or str, optional
:return: The document representing the file, or None if not found.
:rtype: Document or None
"""
file_repo = self._git_repo.new_file_repository(relative_path=relative_path)
return await file_repo.get(filename=filename)
async def get_all_files(self, relative_path: Path | str = ".") -> List[Document]:
"""Retrieve all files from the file repository.
:param relative_path: The relative path within the file repository.
:type relative_path: Path or str, optional
:return: A list of documents representing all files in the repository.
:rtype: List[Document]
"""
file_repo = self._git_repo.new_file_repository(relative_path=relative_path)
return await file_repo.get_all()
async def save_file(
self, filename: Path | str, content, dependencies: List[str] = None, relative_path: Path | str = "."
):
"""Save a file to the file repository.
:param filename: The name or path of the file to save.
:type filename: Path or str
:param content: The content of the file.
:param dependencies: A list of dependencies for the file.
:type dependencies: List[str], optional
:param relative_path: The relative path within the file repository.
:type relative_path: Path or str, optional
"""
file_repo = self._git_repo.new_file_repository(relative_path=relative_path)
return await file_repo.save(filename=filename, content=content, dependencies=dependencies)
async def save_as(
self, doc: Document, with_suffix: str = None, dependencies: List[str] = None, relative_path: Path | str = "."
):
"""Save a Document instance with optional modifications.
This static method creates a new FileRepository, saves the Document instance
with optional modifications (such as a suffix), and logs the saved file.
:param doc: The Document instance to be saved.
:type doc: Document
:param with_suffix: An optional suffix to append to the saved file's name.
:type with_suffix: str, optional
:param dependencies: A list of dependencies for the saved file.
:type dependencies: List[str], optional
:param relative_path: The relative path within the file repository.
:type relative_path: Path or str, optional
:return: A boolean indicating whether the save operation was successful.
:rtype: bool
"""
file_repo = self._git_repo.new_file_repository(relative_path=relative_path)
return await file_repo.save_doc(doc=doc, with_suffix=with_suffix, dependencies=dependencies)
async def delete(self, filename: Path | str):
"""Delete a file from the file repository.
@ -280,7 +230,3 @@ class FileRepository:
dependency_file = await self._git_repo.get_dependency()
await dependency_file.update(filename=pathname, dependencies=None)
logger.info(f"remove dependency key: {str(pathname)}")
async def delete_file(self, filename: Path | str, relative_path: Path | str = "."):
file_repo = self._git_repo.new_file_repository(relative_path=relative_path)
await file_repo.delete(filename=filename)

View file

@ -107,7 +107,10 @@ class GitRepository:
def delete_repository(self):
"""Delete the entire repository directory."""
if self.is_valid:
shutil.rmtree(self._repository.working_dir)
try:
shutil.rmtree(self._repository.working_dir)
except Exception as e:
logger.exception(f"Failed delete git repo:{self.workdir}, error:{e}")
@property
def changed_files(self) -> Dict[str, str]:
@ -199,10 +202,17 @@ class GitRepository:
if new_path.exists():
logger.info(f"Delete directory {str(new_path)}")
shutil.rmtree(new_path)
if new_path.exists(): # Recheck for windows os
logger.warning(f"Failed to delete directory {str(new_path)}")
return
try:
shutil.move(src=str(self.workdir), dst=str(new_path))
except Exception as e:
logger.warning(f"Move {str(self.workdir)} to {str(new_path)} error: {e}")
finally:
if not new_path.exists(): # Recheck for windows os
logger.warning(f"Failed to move {str(self.workdir)} to {str(new_path)}")
return
logger.info(f"Rename directory {str(self.workdir)} to {str(new_path)}")
self._repository = Repo(new_path)
self._gitignore_rules = parse_gitignore(full_path=str(new_path / ".gitignore"))

View file

@ -0,0 +1,119 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2024/1/8
@Author : mashenquan
@File : project_repo.py
@Desc : Wrapper for GitRepository and FileRepository of project.
Implementation of Chapter 4.6 of https://deepwisdom.feishu.cn/wiki/CUK4wImd7id9WlkQBNscIe9cnqh
"""
from __future__ import annotations
from pathlib import Path
from metagpt.const import (
CLASS_VIEW_FILE_REPO,
CODE_SUMMARIES_FILE_REPO,
CODE_SUMMARIES_PDF_FILE_REPO,
COMPETITIVE_ANALYSIS_FILE_REPO,
DATA_API_DESIGN_FILE_REPO,
DOCS_FILE_REPO,
GRAPH_REPO_FILE_REPO,
PRD_PDF_FILE_REPO,
PRDS_FILE_REPO,
RESOURCES_FILE_REPO,
SD_OUTPUT_FILE_REPO,
SEQ_FLOW_FILE_REPO,
SYSTEM_DESIGN_FILE_REPO,
SYSTEM_DESIGN_PDF_FILE_REPO,
TASK_FILE_REPO,
TASK_PDF_FILE_REPO,
TEST_CODES_FILE_REPO,
TEST_OUTPUTS_FILE_REPO,
)
from metagpt.utils.file_repository import FileRepository
from metagpt.utils.git_repository import GitRepository
class DocFileRepositories(FileRepository):
prd: FileRepository
system_design: FileRepository
task: FileRepository
code_summary: FileRepository
graph_repo: FileRepository
class_view: FileRepository
def __init__(self, git_repo):
super().__init__(git_repo=git_repo, relative_path=DOCS_FILE_REPO)
self.prd = git_repo.new_file_repository(relative_path=PRDS_FILE_REPO)
self.system_design = git_repo.new_file_repository(relative_path=SYSTEM_DESIGN_FILE_REPO)
self.task = git_repo.new_file_repository(relative_path=TASK_FILE_REPO)
self.code_summary = git_repo.new_file_repository(relative_path=CODE_SUMMARIES_FILE_REPO)
self.graph_repo = git_repo.new_file_repository(relative_path=GRAPH_REPO_FILE_REPO)
self.class_view = git_repo.new_file_repository(relative_path=CLASS_VIEW_FILE_REPO)
class ResourceFileRepositories(FileRepository):
competitive_analysis: FileRepository
data_api_design: FileRepository
seq_flow: FileRepository
system_design: FileRepository
prd: FileRepository
api_spec_and_task: FileRepository
code_summary: FileRepository
sd_output: FileRepository
def __init__(self, git_repo):
super().__init__(git_repo=git_repo, relative_path=RESOURCES_FILE_REPO)
self.competitive_analysis = git_repo.new_file_repository(relative_path=COMPETITIVE_ANALYSIS_FILE_REPO)
self.data_api_design = git_repo.new_file_repository(relative_path=DATA_API_DESIGN_FILE_REPO)
self.seq_flow = git_repo.new_file_repository(relative_path=SEQ_FLOW_FILE_REPO)
self.system_design = git_repo.new_file_repository(relative_path=SYSTEM_DESIGN_PDF_FILE_REPO)
self.prd = git_repo.new_file_repository(relative_path=PRD_PDF_FILE_REPO)
self.api_spec_and_task = git_repo.new_file_repository(relative_path=TASK_PDF_FILE_REPO)
self.code_summary = git_repo.new_file_repository(relative_path=CODE_SUMMARIES_PDF_FILE_REPO)
self.sd_output = git_repo.new_file_repository(relative_path=SD_OUTPUT_FILE_REPO)
class ProjectRepo(FileRepository):
def __init__(self, root: str | Path | GitRepository):
if isinstance(root, str) or isinstance(root, Path):
git_repo_ = GitRepository(local_path=Path(root))
elif isinstance(root, GitRepository):
git_repo_ = root
else:
raise ValueError("Invalid root")
super().__init__(git_repo=git_repo_, relative_path=Path("."))
self._git_repo = git_repo_
self.docs = DocFileRepositories(self._git_repo)
self.resources = ResourceFileRepositories(self._git_repo)
self.tests = self._git_repo.new_file_repository(relative_path=TEST_CODES_FILE_REPO)
self.test_outputs = self._git_repo.new_file_repository(relative_path=TEST_OUTPUTS_FILE_REPO)
self._srcs_path = None
@property
def git_repo(self) -> GitRepository:
return self._git_repo
@property
def workdir(self) -> Path:
return Path(self.git_repo.workdir)
@property
def srcs(self) -> FileRepository:
if not self._srcs_path:
raise ValueError("Call with_srcs first.")
return self._git_repo.new_file_repository(self._srcs_path)
def with_src_path(self, path: str | Path) -> ProjectRepo:
try:
self._srcs_path = Path(path).relative_to(self.workdir)
except ValueError:
self._srcs_path = Path(path)
return self
@property
def src_relative_path(self) -> Path | None:
return self._srcs_path

View file

@ -13,28 +13,36 @@ from pydantic import BaseModel, model_validator
class YamlModel(BaseModel):
"""Base class for yaml model"""
extra_fields: Optional[Dict[str, str]] = None
@classmethod
def read_yaml(cls, file_path: Path) -> Dict:
def read_yaml(cls, file_path: Path, encoding: str = "utf-8") -> Dict:
"""Read yaml file and return a dict"""
if not file_path.exists():
return {}
with open(file_path, "r") as file:
with open(file_path, "r", encoding=encoding) as file:
return yaml.safe_load(file)
@classmethod
def model_validate_yaml(cls, file_path: Path) -> "YamlModel":
def from_yaml_file(cls, file_path: Path) -> "YamlModel":
"""Read yaml file and return a YamlModel instance"""
return cls(**cls.read_yaml(file_path))
def model_dump_yaml(self, file_path: Path) -> None:
with open(file_path, "w") as file:
def to_yaml_file(self, file_path: Path, encoding: str = "utf-8") -> None:
"""Dump YamlModel instance to yaml file"""
with open(file_path, "w", encoding=encoding) as file:
yaml.dump(self.model_dump(), file)
class YamlModelWithoutDefault(YamlModel):
"""YamlModel without default values"""
@model_validator(mode="before")
@classmethod
def check_not_default_config(cls, values):
"""Check if there is any default config in config.yaml"""
if any(["YOUR" in v for v in values]):
raise ValueError("Please set your config in config.yaml")
return values