feat: + awrite_bin/aread_bin

This commit is contained in:
莘权 马 2024-02-01 20:19:52 +08:00
parent df8f929665
commit 6d6248f8be
3 changed files with 150 additions and 19 deletions

View file

@ -546,10 +546,10 @@ class UMLClassMethod(UMLClassMeta):
content += name + "(" + ",".join([v.get_mermaid(align=0) for v in self.args]) + ")"
if self.return_type:
content += " " + self.return_type.replace(" ", "")
if self.abstraction:
content += "*"
if self.static:
content += "$"
# if self.abstraction:
# content += "*"
# if self.static:
# content += "$"
return content

View file

@ -405,14 +405,67 @@ def any_to_name(val):
def concat_namespace(*args) -> str:
"""Concatenate fields to create a unique namespace prefix.
Args:
*args: Variable number of arguments representing the fields to be concatenated.
Returns:
str: A string containing the concatenated fields separated by colons.
Example:
>>> concat_namespace('prefix', 'field1', 'field2')
'prefix:field1:field2'
"""
return ":".join(str(value) for value in args)
def split_namespace(ns_class_name: str, maxsplit=1) -> List[str]:
"""Split a namespace-prefixed name into its namespace-prefix and name parts.
Args:
ns_class_name (str): The namespace-prefixed name to be split.
maxsplit (int, optional): The maximum number of splits to perform. Defaults to 1.
Returns:
List[str]: A list containing the namespace-prefix part and the name part.
Example:
>>> split_namespace('prefix:classname')
['prefix', 'classname']
>>> split_namespace('prefix:module:class', maxsplit=2)
['prefix', 'module', 'class']
"""
return ns_class_name.split(":", maxsplit=maxsplit)
def auto_namespace(name: str) -> str:
"""Automatically handle namespace-prefixed names.
If the input name is empty, returns a default namespace prefix and name.
If the input name is not namespace-prefixed, adds a default namespace prefix.
Otherwise, returns the input name unchanged.
Args:
name (str): The input name to be processed.
Returns:
str: The processed name.
Example:
>>> auto_namespace('classname')
'?:classname'
>>> auto_namespace('prefix:classname')
'prefix:classname'
>>> auto_namespace('')
'?:?'
>>> auto_namespace('?:custom')
'?:custom'
"""
if not name:
return "?:?"
v = split_namespace(name)
@ -422,6 +475,26 @@ def auto_namespace(name: str) -> str:
def add_affix(text, affix="brace"):
"""Add affix to encapsulate data.
Args:
text (str): The input text to be encapsulated.
affix (str, optional): The type of affix to use. Defaults to "brace".
Supported affix types: "brace" for curly braces, "url" for URL encoding within curly braces.
Returns:
str: The text encapsulated with the specified affix.
Example:
>>> add_affix("data", affix="brace")
'{data}'
>>> add_affix("example.com", affix="url")
'%7Bexample.com%7D'
>>> add_affix("text", affix="unknown")
'text'
"""
mappings = {
"brace": lambda x: "{" + x + "}",
"url": lambda x: quote("{" + x + "}"),
@ -431,6 +504,26 @@ def add_affix(text, affix="brace"):
def remove_affix(text, affix="brace"):
"""Remove affix to extract encapsulated data.
Args:
text (str): The input text with affix to be removed.
affix (str, optional): The type of affix used. Defaults to "brace".
Supported affix types: "brace" for removing curly braces, "url" for URL decoding within curly braces.
Returns:
str: The text with affix removed.
Example:
>>> remove_affix('{data}', affix="brace")
'data'
>>> remove_affix('%7Bexample.com%7D', affix="url")
'example.com'
>>> remove_affix('text', affix="unknown")
'text'
"""
mappings = {"brace": lambda x: x[1:-1], "url": lambda x: unquote(x)[1:-1]}
decoder = mappings.get(affix, lambda x: x)
return decoder(text)
@ -616,3 +709,42 @@ def remove_white_spaces(v: str) -> str:
str: The input string with white spaces removed, excluding spaces within quotes.
"""
return re.sub(r"(?<!['\"])\s|(?<=['\"])\s", "", v)
async def aread_bin(filename: str | Path) -> bytes:
"""Read binary file asynchronously.
Args:
filename (Union[str, Path]): The name or path of the file to be read.
Returns:
bytes: The content of the file as bytes.
Example:
>>> content = await aread_bin('example.txt')
b'This is the content of the file.'
>>> content = await aread_bin(Path('example.txt'))
b'This is the content of the file.'
"""
async with aiofiles.open(str(filename), mode="rb") as reader:
content = await reader.read()
return content
async def awrite_bin(filename: str | Path, data: bytes):
"""Write binary file asynchronously.
Args:
filename (Union[str, Path]): The name or path of the file to be written.
data (bytes): The binary data to be written to the file.
Example:
>>> await awrite_bin('output.bin', b'This is binary data.')
>>> await awrite_bin(Path('output.bin'), b'Another set of binary data.')
"""
pathname = Path(filename)
pathname.parent.mkdir(parents=True, exist_ok=True)
async with aiofiles.open(str(pathname), mode="wb") as writer:
await writer.write(data)

View file

@ -18,14 +18,14 @@ from metagpt.actions.write_code import WriteCode
from metagpt.const import SYSTEM_DESIGN_FILE_REPO, TASK_FILE_REPO
from metagpt.schema import (
AIMessage,
ClassAttribute,
ClassMethod,
ClassView,
CodeSummarizeContext,
Document,
Message,
MessageQueue,
SystemMessage,
UMLClassAttribute,
UMLClassMethod,
UMLClassView,
UserMessage,
)
from metagpt.utils.common import any_to_str
@ -157,27 +157,26 @@ def test_CodeSummarizeContext(file_list, want):
def test_class_view():
attr_a = ClassAttribute(name="a", value_type="int", default_value="0", visibility="+", abstraction=True)
assert attr_a.get_mermaid(align=1) == "\t+int a=0*"
attr_b = ClassAttribute(name="b", value_type="str", default_value="0", visibility="#", static=True)
assert attr_b.get_mermaid(align=0) == '#str b="0"$'
class_view = ClassView(name="A")
attr_a = UMLClassAttribute(name="a", value_type="int", default_value="0", visibility="+")
assert attr_a.get_mermaid(align=1) == "\t+int a=0"
attr_b = UMLClassAttribute(name="b", value_type="str", default_value="0", visibility="#")
assert attr_b.get_mermaid(align=0) == '#str b="0"'
class_view = UMLClassView(name="A")
class_view.attributes = [attr_a, attr_b]
method_a = ClassMethod(name="run", visibility="+", abstraction=True)
assert method_a.get_mermaid(align=1) == "\t+run()*"
method_b = ClassMethod(
method_a = UMLClassMethod(name="run", visibility="+")
assert method_a.get_mermaid(align=1) == "\t+run()"
method_b = UMLClassMethod(
name="_test",
visibility="#",
static=True,
args=[ClassAttribute(name="a", value_type="str"), ClassAttribute(name="b", value_type="int")],
args=[UMLClassAttribute(name="a", value_type="str"), UMLClassAttribute(name="b", value_type="int")],
return_type="str",
)
assert method_b.get_mermaid(align=0) == "#_test(str a,int b):str$"
assert method_b.get_mermaid(align=0) == "#_test(str a,int b) str"
class_view.methods = [method_a, method_b]
assert (
class_view.get_mermaid(align=0)
== 'class A{\n\t+int a=0*\n\t#str b="0"$\n\t+run()*\n\t#_test(str a,int b):str$\n}\n'
== 'class A{\n\t+int a=0\n\t#str b="0"\n\t+run()\n\t#_test(str a,int b) str\n}\n'
)