Merge main branch

This commit is contained in:
mannaandpoem 2024-01-03 19:48:46 +08:00
commit 24e617b362
325 changed files with 11290 additions and 3760 deletions

139
metagpt/roles/assistant.py Normal file
View file

@ -0,0 +1,139 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/8/7
@Author : mashenquan
@File : assistant.py
@Desc : I am attempting to incorporate certain symbol concepts from UML into MetaGPT, enabling it to have the
ability to freely construct flows through symbol concatenation. Simultaneously, I am also striving to
make these symbols configurable and standardized, making the process of building flows more convenient.
For more about `fork` node in activity diagrams, see: `https://www.uml-diagrams.org/activity-diagrams.html`
This file defines a `fork` style meta role capable of generating arbitrary roles at runtime based on a
configuration file.
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false
indicates that further reasoning cannot continue.
"""
from enum import Enum
from pathlib import Path
from typing import Optional
from pydantic import Field
from metagpt.actions.skill_action import ArgumentsParingAction, SkillAction
from metagpt.actions.talk_action import TalkAction
from metagpt.config import CONFIG
from metagpt.learn.skill_loader import SkillsDeclaration
from metagpt.logs import logger
from metagpt.memory.brain_memory import BrainMemory
from metagpt.roles import Role
from metagpt.schema import Message
class MessageType(Enum):
Talk = "TALK"
Skill = "SKILL"
class Assistant(Role):
"""Assistant for solving common issues."""
name: str = "Lily"
profile: str = "An assistant"
goal: str = "Help to solve problem"
constraints: str = "Talk in {language}"
desc: str = ""
memory: BrainMemory = Field(default_factory=BrainMemory)
skills: Optional[SkillsDeclaration] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.constraints = self.constraints.format(language=kwargs.get("language") or CONFIG.language or "Chinese")
async def think(self) -> bool:
"""Everything will be done part by part."""
last_talk = await self.refine_memory()
if not last_talk:
return False
if not self.skills:
skill_path = Path(CONFIG.SKILL_PATH) if CONFIG.SKILL_PATH else None
self.skills = await SkillsDeclaration.load(skill_yaml_file_name=skill_path)
prompt = ""
skills = self.skills.get_skill_list()
for desc, name in skills.items():
prompt += f"If the text explicitly want you to {desc}, return `[SKILL]: {name}` brief and clear. For instance: [SKILL]: {name}\n"
prompt += 'Otherwise, return `[TALK]: {talk}` brief and clear. For instance: if {talk} is "xxxx" return [TALK]: xxxx\n\n'
prompt += f"Now what specific action is explicitly mentioned in the text: {last_talk}\n"
rsp = await self.llm.aask(prompt, [])
logger.info(f"THINK: {prompt}\n, THINK RESULT: {rsp}\n")
return await self._plan(rsp, last_talk=last_talk)
async def act(self) -> Message:
result = await self.rc.todo.run()
if not result:
return None
if isinstance(result, str):
msg = Message(content=result, role="assistant", cause_by=self.rc.todo)
elif isinstance(result, Message):
msg = result
else:
msg = Message(content=result.content, instruct_content=result.instruct_content, cause_by=type(self.rc.todo))
self.memory.add_answer(msg)
return msg
async def talk(self, text):
self.memory.add_talk(Message(content=text))
async def _plan(self, rsp: str, **kwargs) -> bool:
skill, text = BrainMemory.extract_info(input_string=rsp)
handlers = {
MessageType.Talk.value: self.talk_handler,
MessageType.Skill.value: self.skill_handler,
}
handler = handlers.get(skill, self.talk_handler)
return await handler(text, **kwargs)
async def talk_handler(self, text, **kwargs) -> bool:
history = self.memory.history_text
text = kwargs.get("last_talk") or text
self.rc.todo = TalkAction(
context=text, knowledge=self.memory.get_knowledge(), history_summary=history, llm=self.llm, **kwargs
)
return True
async def skill_handler(self, text, **kwargs) -> bool:
last_talk = kwargs.get("last_talk")
skill = self.skills.get_skill(text)
if not skill:
logger.info(f"skill not found: {text}")
return await self.talk_handler(text=last_talk, **kwargs)
action = ArgumentsParingAction(skill=skill, llm=self.llm, ask=last_talk, **kwargs)
await action.run(**kwargs)
if action.args is None:
return await self.talk_handler(text=last_talk, **kwargs)
self.rc.todo = SkillAction(skill=skill, args=action.args, llm=self.llm, name=skill.name, desc=skill.description)
return True
async def refine_memory(self) -> str:
last_talk = self.memory.pop_last_talk()
if last_talk is None: # No user feedback, unsure if past conversation is finished.
return None
if not self.memory.is_history_available:
return last_talk
history_summary = await self.memory.summarize(max_words=800, keep_language=True, llm=self.llm)
if last_talk and await self.memory.is_related(text1=last_talk, text2=history_summary, llm=self.llm):
# Merge relevant content.
merged = await self.memory.rewrite(sentence=last_talk, context=history_summary, llm=self.llm)
return f"{merged} {last_talk}"
return last_talk
def get_memory(self) -> str:
return self.memory.model_dump_json()
def load_memory(self, m):
try:
self.memory = BrainMemory(**m)
except Exception as e:
logger.exception(f"load error:{e}, data:{jsn}")

View file

@ -7,12 +7,11 @@
"""
from typing import Optional
from pydantic import Field
from metagpt.document_store.base_store import BaseStore
from metagpt.roles import Sales
# from metagpt.actions import SearchAndSummarize
# from metagpt.tools import SearchEngineType
DESC = """
## Principles (all things must not bypass the principles)
@ -29,4 +28,4 @@ class CustomerService(Sales):
name: str = "Xiaomei"
profile: str = "Human customer service"
desc: str = DESC
store: Optional[str] = None
store: Optional[BaseStore] = Field(default=None, exclude=True)

View file

@ -48,7 +48,7 @@ from metagpt.schema import (
Documents,
Message,
)
from metagpt.utils.common import any_to_str, any_to_str_set
from metagpt.utils.common import any_to_name, any_to_str, any_to_str_set
IS_PASS_PROMPT = """
{context}
@ -83,13 +83,17 @@ class Engineer(Role):
n_borg: int = 1
use_code_review: bool = False
code_todos: list = []
summarize_todos = []
summarize_todos: list = []
next_todo_action: str = ""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._init_actions([WriteCode])
self._watch([WriteTasks, SummarizeCode, WriteCode, WriteCodeReview, FixBug])
self.code_todos = []
self.summarize_todos = []
self.next_todo_action = any_to_name(WriteCode)
@staticmethod
def _parse_tasks(task_msg: Document) -> list[str]:
@ -110,7 +114,7 @@ class Engineer(Role):
coding_context = await todo.run(guideline=guideline)
# Code review
if review:
action = WriteCodeReview(context=coding_context, llm=self._llm)
action = WriteCodeReview(context=coding_context, llm=self.llm)
self._init_action_system_message(action)
coding_context = await action.run()
await src_file_repo.save(
@ -119,9 +123,12 @@ class Engineer(Role):
content=coding_context.code_doc.content,
)
msg = Message(
content=coding_context.json(), instruct_content=coding_context, role=self.profile, cause_by=WriteCode
content=coding_context.model_dump_json(),
instruct_content=coding_context,
role=self.profile,
cause_by=WriteCode,
)
self._rc.memory.add(msg)
self.rc.memory.add(msg)
changed_files.add(coding_context.code_doc.filename)
if not changed_files:
@ -130,11 +137,13 @@ class Engineer(Role):
async def _act(self) -> Message | None:
"""Determines the mode of action based on whether code review is used."""
if self._rc.todo is None:
if self.rc.todo is None:
return None
if isinstance(self._rc.todo, WriteCode):
if isinstance(self.rc.todo, WriteCode):
self.next_todo_action = any_to_name(SummarizeCode)
return await self._act_write_code()
if isinstance(self._rc.todo, SummarizeCode):
if isinstance(self.rc.todo, SummarizeCode):
self.next_todo_action = any_to_name(WriteCode)
return await self._act_summarize()
return None
@ -173,7 +182,7 @@ class Engineer(Role):
tasks.append(todo.context.dict())
await code_summaries_file_repo.save(
filename=Path(todo.context.design_filename).name,
content=todo.context.json(),
content=todo.context.model_dump_json(),
dependencies=dependencies,
)
else:
@ -196,7 +205,7 @@ class Engineer(Role):
)
async def _is_pass(self, summary) -> (str, str):
rsp = await self._llm.aask(msg=IS_PASS_PROMPT.format(context=summary), stream=False)
rsp = await self.llm.aask(msg=IS_PASS_PROMPT.format(context=summary), stream=False)
logger.info(rsp)
if "YES" in rsp:
return True, rsp
@ -207,17 +216,17 @@ class Engineer(Role):
CONFIG.src_workspace = CONFIG.git_repo.workdir / CONFIG.git_repo.workdir.name
write_code_filters = any_to_str_set([WriteTasks, SummarizeCode, FixBug])
summarize_code_filters = any_to_str_set([WriteCode, WriteCodeReview])
if not self._rc.news:
if not self.rc.news:
return None
msg = self._rc.news[0]
msg = self.rc.news[0]
if msg.cause_by in write_code_filters:
logger.debug(f"TODO WriteCode:{msg.json()}")
logger.debug(f"TODO WriteCode:{msg.model_dump_json()}")
await self._new_code_actions(bug_fix=msg.cause_by == any_to_str(FixBug))
return self._rc.todo
return self.rc.todo
if msg.cause_by in summarize_code_filters and msg.sent_from == any_to_str(self):
logger.debug(f"TODO SummarizeCode:{msg.json()}")
logger.debug(f"TODO SummarizeCode:{msg.model_dump_json()}")
await self._new_summarize_actions()
return self._rc.todo
return self.rc.todo
return None
@staticmethod
@ -235,7 +244,9 @@ class Engineer(Role):
task_doc = await task_file_repo.get(i.name)
elif str(i.parent) == SYSTEM_DESIGN_FILE_REPO:
design_doc = await design_file_repo.get(i.name)
# FIXME: design doc没有加载进来是None
if not task_doc or not design_doc:
logger.error(f'Detected source code "{filename}" from an unknown origin.')
raise ValueError(f'Detected source code "{filename}" from an unknown origin.')
context = CodingContext(filename=filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc)
return context
@ -244,7 +255,9 @@ class Engineer(Role):
context = await Engineer._new_coding_context(
filename, src_file_repo, task_file_repo, design_file_repo, dependency
)
coding_doc = Document(root_path=str(src_file_repo.root_path), filename=filename, content=context.json())
coding_doc = Document(
root_path=str(src_file_repo.root_path), filename=filename, content=context.model_dump_json()
)
return coding_doc
async def _new_code_actions(self, bug_fix=False):
@ -269,15 +282,15 @@ class Engineer(Role):
filename=task_filename, design_doc=design_doc, task_doc=task_doc, code_doc=old_code_doc
)
coding_doc = Document(
root_path=str(src_file_repo.root_path), filename=task_filename, content=context.json()
root_path=str(src_file_repo.root_path), filename=task_filename, content=context.model_dump_json()
)
if task_filename in changed_files.docs:
logger.warning(
f"Log to expose potential conflicts: {coding_doc.json()} & "
f"{changed_files.docs[task_filename].json()}"
f"Log to expose potential conflicts: {coding_doc.model_dump_json()} & "
f"{changed_files.docs[task_filename].model_dump_json()}"
)
changed_files.docs[task_filename] = coding_doc
self.code_todos = [WriteCode(context=i, llm=self._llm) for i in changed_files.docs.values()]
self.code_todos = [WriteCode(context=i, llm=self.llm) for i in changed_files.docs.values()]
# Code directly modified by the user.
dependency = await CONFIG.git_repo.get_dependency()
for filename in changed_src_files:
@ -291,10 +304,10 @@ class Engineer(Role):
dependency=dependency,
)
changed_files.docs[filename] = coding_doc
self.code_todos.append(WriteCode(context=coding_doc, llm=self._llm))
self.code_todos.append(WriteCode(context=coding_doc, llm=self.llm))
if self.code_todos:
self._rc.todo = self.code_todos[0]
self.rc.todo = self.code_todos[0]
async def _new_summarize_actions(self):
src_file_repo = CONFIG.git_repo.new_file_repository(CONFIG.src_workspace)
@ -307,9 +320,14 @@ class Engineer(Role):
summarizations[ctx].append(filename)
for ctx, filenames in summarizations.items():
ctx.codes_filenames = filenames
self.summarize_todos.append(SummarizeCode(context=ctx, llm=self._llm))
self.summarize_todos.append(SummarizeCode(context=ctx, llm=self.llm))
if self.summarize_todos:
self._rc.todo = self.summarize_todos[0]
self.rc.todo = self.summarize_todos[0]
@property
def todo(self) -> str:
"""AgentStore uses this attribute to display to the user what actions the current role should take."""
return self.next_todo_action
async def _write_code_guideline(self):
logger.info("Writing code guideline..")

View file

@ -69,8 +69,8 @@ class InvoiceOCRAssistant(Role):
Returns:
A message containing the result of the action.
"""
msg = self._rc.memory.get(k=1)[0]
todo = self._rc.todo
msg = self.rc.memory.get(k=1)[0]
todo = self.rc.todo
if isinstance(todo, InvoiceOCR):
self.origin_query = msg.content
invoice_path: InvoicePath = msg.instruct_content
@ -87,11 +87,11 @@ class InvoiceOCRAssistant(Role):
else:
self._init_actions([GenerateTable])
self._rc.todo = None
self.rc.todo = None
content = INVOICE_OCR_SUCCESS
resp = OCRResults(ocr_result=json.dumps(resp))
msg = Message(content=content, instruct_content=resp)
self._rc.memory.add(msg)
self.rc.memory.add(msg)
return await super().react()
elif isinstance(todo, GenerateTable):
ocr_results: OCRResults = msg.instruct_content
@ -108,5 +108,5 @@ class InvoiceOCRAssistant(Role):
resp = ReplyData(content=resp)
msg = Message(content=content, instruct_content=resp)
self._rc.memory.add(msg)
self.rc.memory.add(msg)
return msg

View file

@ -7,11 +7,11 @@
@Modified By: mashenquan, 2023/11/27. Add `PrepareDocuments` action according to Section 2.2.3.5.1 of RFC 135.
"""
from metagpt.actions import UserRequirement, WritePRD
from metagpt.actions.prepare_documents import PrepareDocuments
from metagpt.config import CONFIG
from metagpt.roles.role import Role
from metagpt.utils.common import any_to_name
class ProductManager(Role):
@ -29,20 +29,29 @@ class ProductManager(Role):
profile: str = "Product Manager"
goal: str = "efficiently create a successful product that meets market demands and user expectations"
constraints: str = "utilize the same language as the user requirements for seamless communication"
todo_action: str = ""
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._init_actions([PrepareDocuments, WritePRD])
self._watch([UserRequirement, PrepareDocuments])
self.todo_action = any_to_name(PrepareDocuments)
async def _think(self) -> None:
async def _think(self) -> bool:
"""Decide what to do"""
if CONFIG.git_repo:
if CONFIG.git_repo and not CONFIG.git_reinit:
self._set_state(1)
else:
self._set_state(0)
return self._rc.todo
CONFIG.git_reinit = False
self.todo_action = any_to_name(WritePRD)
return bool(self.rc.todo)
async def _observe(self, ignore_memory=False) -> int:
return await super()._observe(ignore_memory=True)
@property
def todo(self) -> str:
"""AgentStore uses this attribute to display to the user what actions the current role should take."""
return self.todo_action

View file

@ -69,7 +69,7 @@ class QaEngineer(Role):
)
logger.info(f"Writing {test_doc.filename}..")
context = TestingContext(filename=test_doc.filename, test_doc=test_doc, code_doc=code_doc)
context = await WriteTest(context=context, llm=self._llm).run()
context = await WriteTest(context=context, llm=self.llm).run()
await tests_file_repo.save(
filename=context.test_doc.filename,
content=context.test_doc.content,
@ -86,7 +86,7 @@ class QaEngineer(Role):
)
self.publish_message(
Message(
content=run_code_context.json(),
content=run_code_context.model_dump_json(),
role=self.profile,
cause_by=WriteTest,
sent_from=self,
@ -106,11 +106,11 @@ class QaEngineer(Role):
return
run_code_context.code = src_doc.content
run_code_context.test_code = test_doc.content
result = await RunCode(context=run_code_context, llm=self._llm).run()
result = await RunCode(context=run_code_context, llm=self.llm).run()
run_code_context.output_filename = run_code_context.test_filename + ".json"
await CONFIG.git_repo.new_file_repository(TEST_OUTPUTS_FILE_REPO).save(
filename=run_code_context.output_filename,
content=result.json(),
content=result.model_dump_json(),
dependencies={src_doc.root_relative_path, test_doc.root_relative_path},
)
run_code_context.code = None
@ -120,7 +120,7 @@ class QaEngineer(Role):
mappings = {"Engineer": "Alex", "QaEngineer": "Edward"}
self.publish_message(
Message(
content=run_code_context.json(),
content=run_code_context.model_dump_json(),
role=self.profile,
cause_by=RunCode,
sent_from=self,
@ -130,14 +130,14 @@ class QaEngineer(Role):
async def _debug_error(self, msg):
run_code_context = RunCodeContext.loads(msg.content)
code = await DebugError(context=run_code_context, llm=self._llm).run()
code = await DebugError(context=run_code_context, llm=self.llm).run()
await FileRepository.save_file(
filename=run_code_context.test_filename, content=code, relative_path=TEST_CODES_FILE_REPO
)
run_code_context.output = None
self.publish_message(
Message(
content=run_code_context.json(),
content=run_code_context.model_dump_json(),
role=self.profile,
cause_by=DebugError,
sent_from=self,
@ -159,7 +159,7 @@ class QaEngineer(Role):
code_filters = any_to_str_set({SummarizeCode})
test_filters = any_to_str_set({WriteTest, DebugError})
run_filters = any_to_str_set({RunCode})
for msg in self._rc.news:
for msg in self.rc.news:
# Decide what to do based on observed msg type, currently defined by human,
# might potentially be moved to _think, that is, let the agent decides for itself
if msg.cause_by in code_filters:

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python
"""
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
@Modified By: mashenquan, 2023-11-1. According to Chapter 2.2.1 and 2.2.2 of RFC 116, change the data type of
the `cause_by` value in the `Message` to a string to support the new message distribution feature.
"""
@ -40,10 +41,21 @@ class Researcher(Role):
if self.language not in ("en-us", "zh-cn"):
logger.warning(f"The language `{self.language}` has not been tested, it may not work.")
async def _think(self) -> bool:
if self.rc.todo is None:
self._set_state(0)
return True
if self.rc.state + 1 < len(self.states):
self._set_state(self.rc.state + 1)
else:
self.rc.todo = None
return False
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self._rc.todo}({self._rc.todo.name})")
todo = self._rc.todo
msg = self._rc.memory.get(k=1)[0]
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
todo = self.rc.todo
msg = self.rc.memory.get(k=1)[0]
if isinstance(msg.instruct_content, Report):
instruct_content = msg.instruct_content
topic = instruct_content.topic
@ -67,14 +79,14 @@ class Researcher(Role):
else:
summaries = instruct_content.summaries
summary_text = "\n---\n".join(f"url: {url}\nsummary: {summary}" for (url, summary) in summaries)
content = await self._rc.todo.run(topic, summary_text, system_text=research_system_text)
content = await self.rc.todo.run(topic, summary_text, system_text=research_system_text)
ret = Message(
content="",
instruct_content=Report(topic=topic, content=content),
role=self.profile,
cause_by=self._rc.todo,
cause_by=self.rc.todo,
)
self._rc.memory.add(ret)
self.rc.memory.add(ret)
return ret
def research_system_text(self, topic, current_task: Action) -> str:

View file

@ -4,13 +4,14 @@
@Time : 2023/5/11 14:42
@Author : alexanderwu
@File : role.py
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
@Modified By: mashenquan, 2023-11-1. According to Chapter 2.2.1 and 2.2.2 of RFC 116:
1. Merge the `recv` functionality into the `_observe` function. Future message reading operations will be
consolidated within the `_observe` function.
2. Standardize the message filtering for string label matching. Role objects can access the message labels
they've subscribed to through the `subscribed_tags` property.
3. Move the message receive buffer from the global variable `self._rc.env.memory` to the role's private variable
`self._rc.msg_buffer` for easier message identification and asynchronous appending of messages.
3. Move the message receive buffer from the global variable `self.rc.env.memory` to the role's private variable
`self.rc.msg_buffer` for easier message identification and asynchronous appending of messages.
4. Standardize the way messages are passed: `publish_message` sends messages out, while `put_message` places
messages into the Role object's private message receive buffer. There are no other message transmit methods.
5. Standardize the parameters for the `run` function: the `test_message` parameter is used for testing purposes
@ -23,21 +24,21 @@ from __future__ import annotations
from enum import Enum
from pathlib import Path
from typing import Any, Iterable, Set, Type
from typing import Any, Iterable, Optional, Set, Type
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field, SerializeAsAny, model_validator
from metagpt.actions import Action, ActionOutput
from metagpt.actions.action import action_subclass_registry
from metagpt.actions.action_node import ActionNode
from metagpt.actions.add_requirement import UserRequirement
from metagpt.const import SERDESER_PATH
from metagpt.llm import LLM, HumanProvider
from metagpt.logs import logger
from metagpt.memory import Memory
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.schema import Message, MessageQueue
from metagpt.provider.base_llm import BaseLLM
from metagpt.schema import Message, MessageQueue, SerializationMixin
from metagpt.utils.common import (
any_to_name,
any_to_str,
import_class,
read_json_file,
@ -90,8 +91,10 @@ class RoleReactMode(str, Enum):
class RoleContext(BaseModel):
"""Role Runtime Context"""
model_config = ConfigDict(arbitrary_types_allowed=True)
# # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`
env: "Environment" = Field(default=None, exclude=True)
env: "Environment" = Field(default=None, exclude=True) # # avoid circular import
# TODO judge if ser&deser
msg_buffer: MessageQueue = Field(
default_factory=MessageQueue, exclude=True
@ -107,9 +110,6 @@ class RoleContext(BaseModel):
) # see `Role._set_react_mode` for definitions of the following two attributes
max_react_loop: int = 1
class Config:
arbitrary_types_allowed = True
def check(self, role_id: str):
# if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:
# self.long_term_memory.recover_memory(role_id, self)
@ -118,7 +118,7 @@ class RoleContext(BaseModel):
@property
def important_memory(self) -> list[Message]:
"""Get the information corresponding to the watched actions"""
"""Retrieve information corresponding to the attention action."""
return self.memory.get_by_actions(self.watch)
@property
@ -126,12 +126,11 @@ class RoleContext(BaseModel):
return self.memory.get()
role_subclass_registry = {}
class Role(BaseModel):
class Role(SerializationMixin, is_polymorphic_base=True):
"""Role/Agent"""
model_config = ConfigDict(arbitrary_types_allowed=True, exclude=["llm"])
name: str = ""
profile: str = ""
goal: str = ""
@ -139,84 +138,40 @@ class Role(BaseModel):
desc: str = ""
is_human: bool = False
_llm: BaseGPTAPI = Field(default_factory=LLM) # Each role has its own LLM, use different system message
_role_id: str = ""
_states: list[str] = []
_actions: list[Action] = []
_rc: RoleContext = Field(default_factory=RoleContext)
llm: BaseLLM = Field(default_factory=LLM, exclude=True) # Each role has its own LLM, use different system message
role_id: str = ""
states: list[str] = []
actions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)
rc: RoleContext = Field(default_factory=RoleContext)
subscription: set[str] = set()
# builtin variables
recovered: bool = False # to tag if a recovered role
latest_observed_msg: Message = None # record the latest observed message when interrupted
builtin_class_name: str = ""
_private_attributes = {
"_llm": None,
"_role_id": _role_id,
"_states": [],
"_actions": [],
"_rc": RoleContext(),
"_subscription": set(),
}
latest_observed_msg: Optional[Message] = None # record the latest observed message when interrupted
__hash__ = object.__hash__ # support Role as hashable type in `Environment.members`
class Config:
arbitrary_types_allowed = True
exclude = ["_llm"]
@model_validator(mode="after")
def check_subscription(self):
if not self.subscription:
self.subscription = {any_to_str(self), self.name} if self.name else {any_to_str(self)}
return self
def __init__(self, **kwargs: Any):
for index in range(len(kwargs.get("_actions", []))):
current_action = kwargs["_actions"][index]
if isinstance(current_action, dict):
item_class_name = current_action.get("builtin_class_name", None)
for name, subclass in action_subclass_registry.items():
registery_class_name = subclass.__fields__["builtin_class_name"].default
if item_class_name == registery_class_name:
current_action = subclass(**current_action)
break
kwargs["_actions"][index] = current_action
def __init__(self, **data: Any):
# --- avoid PydanticUndefinedAnnotation name 'Environment' is not defined #
from metagpt.environment import Environment
super().__init__(**kwargs)
Environment
# ------
Role.model_rebuild()
super().__init__(**data)
# 关于私有变量的初始化 https://github.com/pydantic/pydantic/issues/655
self._private_attributes["_llm"] = LLM() if not self.is_human else HumanProvider()
self._private_attributes["_role_id"] = str(self._setting)
self.subscription = {any_to_str(self), self.name} if self.name else {any_to_str(self)}
for key in self._private_attributes.keys():
if key in kwargs:
object.__setattr__(self, key, kwargs[key])
if key == "_rc":
_rc = RoleContext(**kwargs["_rc"])
object.__setattr__(self, "_rc", _rc)
else:
if key == "_rc":
# # Warning, if use self._private_attributes["_rc"],
# # self._rc will be a shared object between roles, so init one or reset it inside `_reset`
object.__setattr__(self, key, RoleContext())
else:
object.__setattr__(self, key, self._private_attributes[key])
self._llm.system_prompt = self._get_prefix()
# deserialize child classes dynamically for inherited `role`
object.__setattr__(self, "builtin_class_name", self.__class__.__name__)
self.__fields__["builtin_class_name"].default = self.__class__.__name__
if "actions" in kwargs:
self._init_actions(kwargs["actions"])
self._watch(kwargs.get("watch") or [UserRequirement])
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
role_subclass_registry[cls.__name__] = cls
self.llm.system_prompt = self._get_prefix()
self._watch(data.get("watch") or [UserRequirement])
def _reset(self):
object.__setattr__(self, "_states", [])
object.__setattr__(self, "_actions", [])
self.states = []
self.actions = []
@property
def _setting(self):
@ -229,12 +184,12 @@ class Role(BaseModel):
else stg_path
)
role_info = self.dict(exclude={"_rc": {"memory": True, "msg_buffer": True}, "_llm": True})
role_info = self.model_dump(exclude={"rc": {"memory": True, "msg_buffer": True}, "llm": True})
role_info.update({"role_class": self.__class__.__name__, "module_name": self.__module__})
role_info_path = stg_path.joinpath("role_info.json")
write_json_file(role_info_path, role_info)
self._rc.memory.serialize(stg_path) # serialize role's memory alone
self.rc.memory.serialize(stg_path) # serialize role's memory alone
@classmethod
def deserialize(cls, stg_path: Path) -> "Role":
@ -258,13 +213,13 @@ class Role(BaseModel):
action.set_prefix(self._get_prefix())
def refresh_system_message(self):
self._llm.system_prompt = self._get_prefix()
self.llm.system_prompt = self._get_prefix()
def set_recovered(self, recovered: bool = False):
self.recovered = recovered
def set_memory(self, memory: Memory):
self._rc.memory = memory
self.rc.memory = memory
def init_actions(self, actions):
self._init_actions(actions)
@ -274,7 +229,7 @@ class Role(BaseModel):
for idx, action in enumerate(actions):
if not isinstance(action, Action):
## 默认初始化
i = action(name="", llm=self._llm)
i = action(name="", llm=self.llm)
else:
if self.is_human and not isinstance(action.llm, HumanProvider):
logger.warning(
@ -283,10 +238,9 @@ class Role(BaseModel):
f"try passing in Action classes instead of initialized instances"
)
i = action
# i.set_env(self._rc.env)
self._init_action_system_message(i)
self._actions.append(i)
self._states.append(f"{idx}. {action}")
self.actions.append(i)
self.states.append(f"{idx}. {action}")
def _set_react_mode(self, react_mode: str, max_react_loop: int = 1):
"""Set strategy of the Role reacting to observed Message. Variation lies in how
@ -305,17 +259,20 @@ class Role(BaseModel):
Defaults to 1, i.e. _think -> _act (-> return result and end)
"""
assert react_mode in RoleReactMode.values(), f"react_mode must be one of {RoleReactMode.values()}"
self._rc.react_mode = react_mode
self.rc.react_mode = react_mode
if react_mode == RoleReactMode.REACT:
self._rc.max_react_loop = max_react_loop
self.rc.max_react_loop = max_react_loop
def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):
"""Watch Actions of interest. Role will select Messages caused by these Actions from its personal message
buffer during _observe.
"""
self._rc.watch = {any_to_str(t) for t in actions}
self.rc.watch = {any_to_str(t) for t in actions}
# check RoleContext after adding watch actions
self._rc.check(self._role_id)
self.rc.check(self.role_id)
def is_watch(self, caused_by: str):
return caused_by in self.rc.watch
def subscribe(self, tags: Set[str]):
"""Used to receive Messages with certain tags from the environment. Message will be put into personal message
@ -323,23 +280,28 @@ class Role(BaseModel):
or profile.
"""
self.subscription = tags
if self._rc.env: # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
self._rc.env.set_subscription(self, self.subscription)
if self.rc.env: # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
self.rc.env.set_subscription(self, self.subscription)
def _set_state(self, state: int):
"""Update the current state."""
self._rc.state = state
logger.debug(f"actions={self._actions}, state={state}")
self._rc.todo = self._actions[self._rc.state] if state >= 0 else None
self.rc.state = state
logger.debug(f"actions={self.actions}, state={state}")
self.rc.todo = self.actions[self.rc.state] if state >= 0 else None
def set_env(self, env: "Environment"):
"""Set the environment in which the role works. The role can talk to the environment and can also receive
messages by observing."""
self._rc.env = env
self.rc.env = env
if env:
env.set_subscription(self, self.subscription)
self.refresh_system_message() # add env message to system message
@property
def action_count(self):
"""Return number of action"""
return len(self.actions)
def _get_prefix(self):
"""Get the role prefix"""
if self.desc:
@ -350,36 +312,38 @@ class Role(BaseModel):
if self.constraints:
prefix += CONSTRAINT_TEMPLATE.format(**{"constraints": self.constraints})
if self._rc.env and self._rc.env.desc:
other_role_names = ", ".join(self._rc.env.role_names())
env_desc = f"You are in {self._rc.env.desc} with roles({other_role_names})."
if self.rc.env and self.rc.env.desc:
other_role_names = ", ".join(self.rc.env.role_names())
env_desc = f"You are in {self.rc.env.desc} with roles({other_role_names})."
prefix += env_desc
return prefix
async def _think(self) -> None:
"""Think about what to do and decide on the next action"""
if len(self._actions) == 1:
async def _think(self) -> bool:
"""Consider what to do and decide on the next course of action. Return false if nothing can be done."""
if len(self.actions) == 1:
# If there is only one action, then only this one can be performed
self._set_state(0)
return
if self.recovered and self._rc.state >= 0:
self._set_state(self._rc.state) # action to run from recovered state
self.recovered = False # avoid max_react_loop out of work
return
return True
if self.recovered and self.rc.state >= 0:
self._set_state(self.rc.state) # action to run from recovered state
self.set_recovered(False) # avoid max_react_loop out of work
return True
prompt = self._get_prefix()
prompt += STATE_TEMPLATE.format(
history=self._rc.history,
states="\n".join(self._states),
n_states=len(self._states) - 1,
previous_state=self._rc.state,
history=self.rc.history,
states="\n".join(self.states),
n_states=len(self.states) - 1,
previous_state=self.rc.state,
)
next_state = await self._llm.aask(prompt)
next_state = await self.llm.aask(prompt)
next_state = extract_state_value_from_output(next_state)
logger.debug(f"{prompt=}")
if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self._states)):
if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):
logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")
next_state = -1
else:
@ -387,73 +351,66 @@ class Role(BaseModel):
if next_state == -1:
logger.info(f"End actions with {next_state=}")
self._set_state(next_state)
return True
async def _act(self) -> Message:
logger.info(f"{self._setting}: to do {self._rc.todo}({self._rc.todo.name})")
response = await self._rc.todo.run(self._rc.history)
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
response = await self.rc.todo.run(self.rc.history)
if isinstance(response, (ActionOutput, ActionNode)):
msg = Message(
content=response.content,
instruct_content=response.instruct_content,
role=self._setting,
cause_by=self._rc.todo,
cause_by=self.rc.todo,
sent_from=self,
)
elif isinstance(response, Message):
msg = response
else:
msg = Message(content=response, role=self.profile, cause_by=self._rc.todo, sent_from=self)
self._rc.memory.add(msg)
msg = Message(content=response, role=self.profile, cause_by=self.rc.todo, sent_from=self)
self.rc.memory.add(msg)
return msg
def _find_news(self, observed: list[Message], existed: list[Message]) -> list[Message]:
news = []
# Warning, remove `id` here to make it work for recover
observed_pure = [msg.dict(exclude={"id": True}) for msg in observed]
existed_pure = [msg.dict(exclude={"id": True}) for msg in existed]
for idx, new in enumerate(observed_pure):
if (new["cause_by"] in self._rc.watch or self.name in new["send_to"]) and new not in existed_pure:
news.append(observed[idx])
return news
async def _observe(self, ignore_memory=False) -> int:
"""Prepare new messages for processing from the message buffer and other sources."""
# Read unprocessed messages from the msg buffer.
news = self._rc.msg_buffer.pop_all()
news = []
if self.recovered:
news = [self.latest_observed_msg] if self.latest_observed_msg else []
else:
self.latest_observed_msg = news[-1] if len(news) > 0 else None # record the latest observed msg
if not news:
news = self.rc.msg_buffer.pop_all()
# Store the read messages in your own memory to prevent duplicate processing.
old_messages = [] if ignore_memory else self._rc.memory.get()
self._rc.memory.add_batch(news)
old_messages = [] if ignore_memory else self.rc.memory.get()
self.rc.memory.add_batch(news)
# Filter out messages of interest.
self._rc.news = self._find_news(news, old_messages)
self.rc.news = [
n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]
self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None # record the latest observed msg
# Design Rules:
# If you need to further categorize Message objects, you can do so using the Message.set_meta function.
# msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
news_text = [f"{i.role}: {i.content[:20]}..." for i in self._rc.news]
news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
if news_text:
logger.debug(f"{self._setting} observed: {news_text}")
return len(self._rc.news)
return len(self.rc.news)
def publish_message(self, msg):
"""If the role belongs to env, then the role's messages will be broadcast to env"""
if not msg:
return
if not self._rc.env:
if not self.rc.env:
# If env does not exist, do not publish the message
return
self._rc.env.publish_message(msg)
self.rc.env.publish_message(msg)
def put_message(self, message):
"""Place the message into the Role object's private message buffer."""
if not message:
return
self._rc.msg_buffer.push(message)
self.rc.msg_buffer.push(message)
async def _react(self) -> Message:
"""Think first, then act, until the Role _think it is time to stop and requires no more todo.
@ -462,22 +419,22 @@ class Role(BaseModel):
"""
actions_taken = 0
rsp = Message(content="No actions taken yet") # will be overwritten after Role _act
while actions_taken < self._rc.max_react_loop:
while actions_taken < self.rc.max_react_loop:
# think
await self._think()
if self._rc.todo is None:
if self.rc.todo is None:
break
# act
logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
rsp = await self._act() # 这个rsp是否需要publish_message
actions_taken += 1
return rsp # return output from the last action
async def _act_by_order(self) -> Message:
"""switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ..."""
start_idx = self._rc.state if self._rc.state >= 0 else 0 # action to run from recovered state
rsp = Message(content="No actions taken yet") # return default message if _actions=[]
for i in range(start_idx, len(self._states)):
start_idx = self.rc.state if self.rc.state >= 0 else 0 # action to run from recovered state
rsp = Message(content="No actions taken yet") # return default message if actions=[]
for i in range(start_idx, len(self.states)):
self._set_state(i)
rsp = await self._act()
return rsp # return output from the last action
@ -489,35 +446,18 @@ class Role(BaseModel):
async def react(self) -> Message:
"""Entry to one of three strategies by which Role reacts to the observed Message"""
if self._rc.react_mode == RoleReactMode.REACT:
if self.rc.react_mode == RoleReactMode.REACT:
rsp = await self._react()
elif self._rc.react_mode == RoleReactMode.BY_ORDER:
elif self.rc.react_mode == RoleReactMode.BY_ORDER:
rsp = await self._act_by_order()
elif self._rc.react_mode == RoleReactMode.PLAN_AND_ACT:
elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:
rsp = await self._plan_and_act()
self._set_state(state=-1) # current reaction is complete, reset state to -1 and todo back to None
return rsp
# # Replaced by run()
# def recv(self, message: Message) -> None:
# """add message to history."""
# # self._history += f"\n{message}"
# # self._context = self._history
# if message in self._rc.memory.get():
# return
# self._rc.memory.add(message)
# # Replaced by run()
# async def handle(self, message: Message) -> Message:
# """Receive information and reply with actions"""
# # logger.debug(f"{self.name=}, {self.profile=}, {message.role=}")
# self.recv(message)
#
# return await self._react()
def get_memories(self, k=0) -> list[Message]:
"""A wrapper to return the most recent k memories of this role, return all when k=0"""
return self._rc.memory.get(k=k)
return self.rc.memory.get(k=k)
@role_raise_decorator
async def run(self, with_message=None) -> Message | None:
@ -542,7 +482,7 @@ class Role(BaseModel):
rsp = await self.react()
# Reset the next action to be taken.
self._rc.todo = None
self.rc.todo = None
# Send the response message to the Environment object to have it relay the message to the subscribers.
self.publish_message(rsp)
return rsp
@ -550,4 +490,21 @@ class Role(BaseModel):
@property
def is_idle(self) -> bool:
"""If true, all actions have been executed."""
return not self._rc.news and not self._rc.todo and self._rc.msg_buffer.empty()
return not self.rc.news and not self.rc.todo and self.rc.msg_buffer.empty()
async def think(self) -> Action:
"""The exported `think` function"""
await self._think()
return self.rc.todo
async def act(self) -> ActionOutput:
"""The exported `act` function"""
msg = await self._act()
return ActionOutput(content=msg.content, instruct_content=msg.instruct_content)
@property
def todo(self) -> str:
"""AgentStore uses this attribute to display to the user what actions the current role should take."""
if self.actions:
return any_to_name(self.actions[0])
return ""

View file

@ -8,6 +8,8 @@
from typing import Optional
from pydantic import Field
from metagpt.actions import SearchAndSummarize, UserRequirement
from metagpt.document_store.base_store import BaseStore
from metagpt.roles import Role
@ -15,16 +17,17 @@ from metagpt.tools import SearchEngineType
class Sales(Role):
name: str = "Xiaomei"
profile: str = "Retail sales guide"
desc: str = "I am a sales guide in retail. My name is Xiaomei. I will answer some customer questions next, and I "
"will answer questions only based on the information in the knowledge base."
"If I feel that you can't get the answer from the reference material, then I will directly reply that"
" I don't know, and I won't tell you that this is from the knowledge base,"
"but pretend to be what I know. Note that each of my replies will be replied in the tone of a "
"professional guide"
name: str = "John Smith"
profile: str = "Retail Sales Guide"
desc: str = (
"As a Retail Sales Guide, my name is John Smith. I specialize in addressing customer inquiries with "
"expertise and precision. My responses are based solely on the information available in our knowledge"
" base. In instances where your query extends beyond this scope, I'll honestly indicate my inability "
"to provide an answer, rather than speculate or assume. Please note, each of my replies will be "
"delivered with the professionalism and courtesy expected of a seasoned sales guide."
)
store: Optional[BaseStore] = None
store: Optional[BaseStore] = Field(default=None, exclude=True)
def __init__(self, **kwargs):
super().__init__(**kwargs)

View file

@ -57,19 +57,19 @@ class Searcher(Role):
async def _act_sp(self) -> Message:
"""Performs the search action in a single process."""
logger.info(f"{self._setting}: to do {self._rc.todo}({self._rc.todo.name})")
response = await self._rc.todo.run(self._rc.memory.get(k=0))
logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
response = await self.rc.todo.run(self.rc.memory.get(k=0))
if isinstance(response, (ActionOutput, ActionNode)):
msg = Message(
content=response.content,
instruct_content=response.instruct_content,
role=self.profile,
cause_by=self._rc.todo,
cause_by=self.rc.todo,
)
else:
msg = Message(content=response, role=self.profile, cause_by=self._rc.todo)
self._rc.memory.add(msg)
msg = Message(content=response, role=self.profile, cause_by=self.rc.todo)
self.rc.memory.add(msg)
return msg
async def _act(self) -> Message:

View file

@ -7,19 +7,19 @@
@Modified By: mashenquan, 2023-11-1. In accordance with Chapter 2.2.1 and 2.2.2 of RFC 116, utilize the new message
distribution feature for message filtering.
"""
from typing import Any, Type
from typing import Any, Callable, Union
from pydantic import Field
from semantic_kernel import Kernel
from semantic_kernel.planning import SequentialPlanner
from semantic_kernel.planning.action_planner.action_planner import ActionPlanner
from semantic_kernel.planning.basic_planner import BasicPlanner
from semantic_kernel.planning.basic_planner import BasicPlanner, Plan
from metagpt.actions import UserRequirement
from metagpt.actions.execute_task import ExecuteTask
from metagpt.llm import LLM
from metagpt.logs import logger
from metagpt.provider.base_gpt_api import BaseGPTAPI
from metagpt.provider.base_llm import BaseLLM
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.make_sk_kernel import make_sk_kernel
@ -41,17 +41,17 @@ class SkAgent(Role):
goal: str = "Execute task based on passed in task description"
constraints: str = ""
plan: Any = None
plan: Plan = Field(default=None, exclude=True)
planner_cls: Any = None
planner: Any = None
llm: BaseGPTAPI = Field(default_factory=LLM)
planner: Union[BasicPlanner, SequentialPlanner, ActionPlanner] = None
llm: BaseLLM = Field(default_factory=LLM)
kernel: Kernel = Field(default_factory=Kernel)
import_semantic_skill_from_directory: Type[Kernel.import_semantic_skill_from_directory] = None
import_skill: Type[Kernel.import_skill] = None
import_semantic_skill_from_directory: Callable = Field(default=None, exclude=True)
import_skill: Callable = Field(default=None, exclude=True)
def __init__(self, **kwargs) -> None:
def __init__(self, **data: Any) -> None:
"""Initializes the Engineer role with given attributes."""
super().__init__(**kwargs)
super().__init__(**data)
self._init_actions([ExecuteTask()])
self._watch([UserRequirement])
self.kernel = make_sk_kernel()
@ -71,10 +71,10 @@ class SkAgent(Role):
self._set_state(0)
# how funny the interface is inconsistent
if isinstance(self.planner, BasicPlanner):
self.plan = await self.planner.create_plan_async(self._rc.important_memory[-1].content, self.kernel)
self.plan = await self.planner.create_plan_async(self.rc.important_memory[-1].content, self.kernel)
logger.info(self.plan.generated_plan)
elif any(isinstance(self.planner, cls) for cls in [SequentialPlanner, ActionPlanner]):
self.plan = await self.planner.create_plan_async(self._rc.important_memory[-1].content)
self.plan = await self.planner.create_plan_async(self.rc.important_memory[-1].content)
async def _act(self) -> Message:
# how funny the interface is inconsistent
@ -85,6 +85,6 @@ class SkAgent(Role):
result = (await self.plan.invoke_async()).result
logger.info(result)
msg = Message(content=result, role=self.profile, cause_by=self._rc.todo)
self._rc.memory.add(msg)
msg = Message(content=result, role=self.profile, cause_by=self.rc.todo)
self.rc.memory.add(msg)
return msg

118
metagpt/roles/teacher.py Normal file
View file

@ -0,0 +1,118 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/7/27
@Author : mashenquan
@File : teacher.py
@Desc : Used by Agent Store
@Modified By: mashenquan, 2023/8/22. A definition has been provided for the return value of _think: returning false indicates that further reasoning cannot continue.
"""
import re
import aiofiles
from metagpt.actions import UserRequirement
from metagpt.actions.write_teaching_plan import TeachingPlanBlock, WriteTeachingPlanPart
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.utils.common import any_to_str
class Teacher(Role):
"""Support configurable teacher roles,
with native and teaching languages being replaceable through configurations."""
name: str = "Lily"
profile: str = "{teaching_language} Teacher"
goal: str = "writing a {language} teaching plan part by part"
constraints: str = "writing in {language}"
desc: str = ""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = WriteTeachingPlanPart.format_value(self.name)
self.profile = WriteTeachingPlanPart.format_value(self.profile)
self.goal = WriteTeachingPlanPart.format_value(self.goal)
self.constraints = WriteTeachingPlanPart.format_value(self.constraints)
self.desc = WriteTeachingPlanPart.format_value(self.desc)
async def _think(self) -> bool:
"""Everything will be done part by part."""
if not self.actions:
if not self.rc.news or self.rc.news[0].cause_by != any_to_str(UserRequirement):
raise ValueError("Lesson content invalid.")
actions = []
print(TeachingPlanBlock.TOPICS)
for topic in TeachingPlanBlock.TOPICS:
act = WriteTeachingPlanPart(context=self.rc.news[0].content, topic=topic, llm=self.llm)
actions.append(act)
self._init_actions(actions)
if self.rc.todo is None:
self._set_state(0)
return True
if self.rc.state + 1 < len(self.states):
self._set_state(self.rc.state + 1)
return True
self.rc.todo = None
return False
async def _react(self) -> Message:
ret = Message(content="")
while True:
await self._think()
if self.rc.todo is None:
break
logger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")
msg = await self._act()
if ret.content != "":
ret.content += "\n\n\n"
ret.content += msg.content
logger.info(ret.content)
await self.save(ret.content)
return ret
async def save(self, content):
"""Save teaching plan"""
filename = Teacher.new_file_name(self.course_title)
pathname = CONFIG.workspace_path / "teaching_plan"
pathname.mkdir(exist_ok=True)
pathname = pathname / filename
try:
async with aiofiles.open(str(pathname), mode="w", encoding="utf-8") as writer:
await writer.write(content)
except Exception as e:
logger.error(f"Save failed{e}")
logger.info(f"Save to:{pathname}")
@staticmethod
def new_file_name(lesson_title, ext=".md"):
"""Create a related file name based on `lesson_title` and `ext`."""
# Define the special characters that need to be replaced.
illegal_chars = r'[#@$%!*&\\/:*?"<>|\n\t \']'
# Replace the special characters with underscores.
filename = re.sub(illegal_chars, "_", lesson_title) + ext
return re.sub(r"_+", "_", filename)
@property
def course_title(self):
"""Return course title of teaching plan"""
default_title = "teaching_plan"
for act in self.actions:
if act.topic != TeachingPlanBlock.COURSE_TITLE:
continue
if act.rsp is None:
return default_title
title = act.rsp.lstrip("# \n")
if "\n" in title:
ix = title.index("\n")
title = title[0:ix]
return title
return default_title

View file

@ -34,9 +34,9 @@ class TutorialAssistant(Role):
constraints: str = "Strictly follow Markdown's syntax, with neat and standardized layout"
language: str = "Chinese"
topic = ""
main_title = ""
total_content = ""
topic: str = ""
main_title: str = ""
total_content: str = ""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@ -71,9 +71,9 @@ class TutorialAssistant(Role):
Returns:
A message containing the result of the action.
"""
todo = self._rc.todo
todo = self.rc.todo
if type(todo) is WriteDirectory:
msg = self._rc.memory.get(k=1)[0]
msg = self.rc.memory.get(k=1)[0]
self.topic = msg.content
resp = await todo.run(topic=self.topic)
logger.info(resp)
@ -90,4 +90,5 @@ class TutorialAssistant(Role):
msg = await super().react()
root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
await File.write(root_path, f"{self.main_title}.md", self.total_content.encode("utf-8"))
msg.content = str(root_path / f"{self.main_title}.md")
return msg