feat: merge geekan:dev

This commit is contained in:
莘权 马 2024-02-19 11:02:10 +08:00
commit 739452edbb
92 changed files with 5372 additions and 197 deletions

View file

@ -362,6 +362,31 @@ def parse_recipient(text):
return ""
def create_func_call_config(func_schema: dict) -> dict:
"""Create new function call config"""
tools = [{"type": "function", "function": func_schema}]
tool_choice = {"type": "function", "function": {"name": func_schema["name"]}}
return {
"tools": tools,
"tool_choice": tool_choice,
}
def remove_comments(code_str: str) -> str:
"""Remove comments from code."""
pattern = r"(\".*?\"|\'.*?\')|(\#.*?$)"
def replace_func(match):
if match.group(2) is not None:
return ""
else:
return match.group(1)
clean_code = re.sub(pattern, replace_func, code_str, flags=re.MULTILINE)
clean_code = os.linesep.join([s.rstrip() for s in clean_code.splitlines() if s.strip()])
return clean_code
def get_class_name(cls) -> str:
"""Return class name"""
return f"{cls.__module__}.{cls.__name__}"
@ -587,13 +612,13 @@ def read_json_file(json_file: str, encoding="utf-8") -> list[Any]:
return data
def write_json_file(json_file: str, data: list, encoding=None):
def write_json_file(json_file: str, data: list, encoding: str = None, indent: int = 4):
folder_path = Path(json_file).parent
if not folder_path.exists():
folder_path.mkdir(parents=True, exist_ok=True)
with open(json_file, "w", encoding=encoding) as fout:
json.dump(data, fout, ensure_ascii=False, indent=4, default=to_jsonable_python)
json.dump(data, fout, ensure_ascii=False, indent=indent, default=to_jsonable_python)
def read_csv_to_list(curr_file: str, header=False, strip_trail=True):

View file

@ -0,0 +1,87 @@
import re
from typing import Tuple
from pydantic import BaseModel
def remove_spaces(text):
return re.sub(r"\s+", " ", text).strip()
class DocstringParser(BaseModel):
docstring: str
def parse_desc(self) -> str:
"""Parse and return the description from the docstring."""
def parse_params(self) -> list[Tuple[str, str, str]]:
"""Parse and return the parameters from the docstring.
Returns:
list[Tuple[str, str, str]]: A list of input paramter info. Each info is a triple of (param name, param type, param description)
"""
def parse_returns(self) -> list[Tuple[str, str]]:
"""Parse and return the output information from the docstring.
Returns:
list[Tuple[str, str]]: A list of output info. Each info is a tuple of (return type, return description)
"""
@staticmethod
def check_and_parse_optional(param_type: str) -> Tuple[bool, str]:
"""Check if a parameter is optional and return a processed param_type rid of the optionality info if so"""
@staticmethod
def check_and_parse_default_value(param_desc: str) -> Tuple[bool, str]:
"""Check if a parameter has a default value and return the default value if so"""
@staticmethod
def check_and_parse_enum(param_desc: str) -> Tuple[bool, str]:
"""Check if a parameter description includes an enum and return enum values if so"""
class reSTDocstringParser(DocstringParser):
"""A parser for reStructuredText (reST) docstring"""
class GoogleDocstringParser(DocstringParser):
"""A parser for Google-stype docstring"""
docstring: str
def parse_desc(self) -> str:
description_match = re.search(r"^(.*?)(?:Args:|Returns:|Raises:|$)", self.docstring, re.DOTALL)
description = remove_spaces(description_match.group(1)) if description_match else ""
return description
def parse_params(self) -> list[Tuple[str, str, str]]:
args_match = re.search(r"Args:\s*(.*?)(?:Returns:|Raises:|$)", self.docstring, re.DOTALL)
_args = args_match.group(1).strip() if args_match else ""
# variable_pattern = re.compile(r"(\w+)\s*\((.*?)\):\s*(.*)")
variable_pattern = re.compile(
r"(\w+)\s*\((.*?)\):\s*(.*?)(?=\n\s*\w+\s*\(|\Z)", re.DOTALL
) # (?=\n\w+\s*\(|\Z) is to assert that what follows is either the start of the next parameter (indicated by a newline, some word characters, and an opening parenthesis) or the end of the string (\Z).
params = variable_pattern.findall(_args)
return params
def parse_returns(self) -> list[Tuple[str, str]]:
returns_match = re.search(r"Returns:\s*(.*?)(?:Raises:|$)", self.docstring, re.DOTALL)
returns = returns_match.group(1).strip() if returns_match else ""
return_pattern = re.compile(r"^(.*)\s*:\s*(.*)$")
returns = return_pattern.findall(returns)
return returns
@staticmethod
def check_and_parse_optional(param_type: str) -> Tuple[bool, str]:
return "optional" in param_type, param_type.replace(", optional", "")
@staticmethod
def check_and_parse_default_value(param_desc: str) -> Tuple[bool, str]:
default_val = re.search(r"Defaults to (.+?)\.", param_desc)
return (True, default_val.group(1)) if default_val else (False, "")
@staticmethod
def check_and_parse_enum(param_desc: str) -> Tuple[bool, str]:
enum_val = re.search(r"Enum: \[(.+?)\]", param_desc)
return (True, [e.strip() for e in enum_val.group(1).split(",")]) if enum_val else (False, [])

View file

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# @Date : 12/20/2023 11:07 AM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import json
from datetime import datetime
from pathlib import Path
import nbformat
from metagpt.const import DATA_PATH
from metagpt.roles.role import Role
from metagpt.utils.common import read_json_file
from metagpt.utils.save_code import save_code_file
def load_history(save_dir: str = ""):
"""
Load plan and code execution history from the specified save directory.
Args:
save_dir (str): The directory from which to load the history.
Returns:
Tuple: A tuple containing the loaded plan and notebook.
"""
plan_path = Path(save_dir) / "plan.json"
nb_path = Path(save_dir) / "history_nb" / "code.ipynb"
plan = read_json_file(plan_path)
nb = nbformat.read(open(nb_path, "r", encoding="utf-8"), as_version=nbformat.NO_CONVERT)
return plan, nb
def save_history(role: Role, save_dir: str = ""):
"""
Save plan and code execution history to the specified directory.
Args:
role (Role): The role containing the plan and execute_code attributes.
save_dir (str): The directory to save the history.
Returns:
Path: The path to the saved history directory.
"""
record_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_path = DATA_PATH / "output" / f"{record_time}"
# overwrite exist trajectory
save_path.mkdir(parents=True, exist_ok=True)
plan = role.planner.plan.dict()
with open(save_path / "plan.json", "w", encoding="utf-8") as plan_file:
json.dump(plan, plan_file, indent=4, ensure_ascii=False)
save_code_file(name=Path(record_time) / "history_nb", code_context=role.execute_code.nb, file_format="ipynb")
return save_path

View file

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# @Date : 12/12/2023 4:14 PM
# @Author : stellahong (stellahong@fuzhi.ai)
# @Desc :
import os
import nbformat
from metagpt.const import DATA_PATH
from metagpt.utils.common import write_json_file
def save_code_file(name: str, code_context: str, file_format: str = "py") -> None:
"""
Save code files to a specified path.
Args:
- name (str): The name of the folder to save the files.
- code_context (str): The code content.
- file_format (str, optional): The file format. Supports 'py' (Python file), 'json' (JSON file), and 'ipynb' (Jupyter Notebook file). Default is 'py'.
Returns:
- None
"""
# Create the folder path if it doesn't exist
os.makedirs(name=DATA_PATH / "output" / f"{name}", exist_ok=True)
# Choose to save as a Python file or a JSON file based on the file format
file_path = DATA_PATH / "output" / f"{name}/code.{file_format}"
if file_format == "py":
file_path.write_text(code_context + "\n\n", encoding="utf-8")
elif file_format == "json":
# Parse the code content as JSON and save
data = {"code": code_context}
write_json_file(file_path, data, encoding="utf-8", indent=2)
elif file_format == "ipynb":
nbformat.write(code_context, file_path)
else:
raise ValueError("Unsupported file format. Please choose 'py', 'json', or 'ipynb'.")