pre-commit modify

This commit is contained in:
didi 2024-10-22 12:37:08 +08:00
parent 462b7d9fd9
commit 56d0af1e9e
23 changed files with 359 additions and 342 deletions

View file

@ -3,11 +3,10 @@
# @Author : didi
# @Desc : Entrance of AFlow.
from metagpt.ext.aflow.scripts.optimizer import Optimizer
from metagpt.ext.aflow.scripts.optimizer import DatasetType, QuestionType, OptimizerType
from metagpt.ext.aflow.data.download_data import download
from metagpt.configs.models_config import ModelsConfig
from typing import Literal
from metagpt.ext.aflow.data.download_data import download
from metagpt.ext.aflow.scripts.optimizer import DatasetType, Optimizer, QuestionType
# DatasetType, QuestionType, and OptimizerType definitions
# DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
@ -32,30 +31,30 @@ claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
# Config operators.
operators = [
"Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes.
"Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes.
# "AnswerGenerate" # It's for qa
# "CustomCodeGenerate", # It's for code
"ScEnsemble", # It's for code, math and qa
"ScEnsemble", # It's for code, math and qa
# "Test", # It's for code
"Programmer", # It's for math
"Programmer", # It's for math
]
# Create an optimizer instance
optimizer = Optimizer(
dataset=dataset, # Config dataset
question_type=question_type, # Config Question Type
opt_llm_config=claude_llm_config, # Config Optimizer LLM
exec_llm_config=mini_llm_config, # Config Execution LLM
check_convergence=check_convergence, # Whether Early Stop
operators=operators, # Config Operators you want to use
optimized_path=optimized_path, # Config Optimized workflow's file path
sample=sample, # Only Top(sample) rounds will be selected.
initial_round=initial_round, # Optimize from initial round
max_rounds=max_rounds # The max iteration of AFLOW.
dataset=dataset, # Config dataset
question_type=question_type, # Config Question Type
opt_llm_config=claude_llm_config, # Config Optimizer LLM
exec_llm_config=mini_llm_config, # Config Execution LLM
check_convergence=check_convergence, # Whether Early Stop
operators=operators, # Config Operators you want to use
optimized_path=optimized_path, # Config Optimized workflow's file path
sample=sample, # Only Top(sample) rounds will be selected.
initial_round=initial_round, # Optimize from initial round
max_rounds=max_rounds, # The max iteration of AFLOW.
)
if __name__ == "__main__":
# Optimize workflow via setting the optimizer's mode to 'Graph'
optimizer.optimize("Graph")
# Test workflow via setting the optimizer's mode to 'Test'
# optimizer.optimize("Test")
# optimizer.optimize("Test")

View file

@ -39,11 +39,14 @@ class ReviseMode(Enum):
TAG = "CONTENT"
class FillMode(Enum):
CODE_FILL = "code_fill"
XML_FILL = "xml_fill"
SINGLE_FILL = "single_fill"
LANGUAGE_CONSTRAINT = "Language: Please use the same language as Human INPUT."
FORMAT_CONSTRAINT = f"Format: output wrapped inside [{TAG}][/{TAG}] like format example, nothing else."
@ -558,7 +561,7 @@ class ActionNode:
if match:
raw_value = match.group(1).strip()
field_type = field_types.get(field_name)
if field_type == str:
extracted_data[field_name] = raw_value
elif field_type == int:
@ -567,7 +570,7 @@ class ActionNode:
except ValueError:
extracted_data[field_name] = 0 # 或者其他默认值
elif field_type == bool:
extracted_data[field_name] = raw_value.lower() in ('true', 'yes', '1', 'on', 'True')
extracted_data[field_name] = raw_value.lower() in ("true", "yes", "1", "on", "True")
elif field_type == list:
try:
extracted_data[field_name] = eval(raw_value)
@ -622,7 +625,6 @@ class ActionNode:
if self.schema:
schema = self.schema
if mode == FillMode.CODE_FILL.value:
result = await self.code_fill(context, function_name, timeout)
self.instruct_content = self.create_class()(**result)

View file

@ -1,14 +1,15 @@
import asyncio
import json
import os
from typing import List, Tuple, Callable, Any
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Callable, List, Tuple
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
class BaseBenchmark(ABC):
def __init__(self, name: str, file_path: str, log_path: str):
self.name = name
@ -17,7 +18,7 @@ class BaseBenchmark(ABC):
async def load_data(self, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(self.file_path, mode="r", encoding='utf-8') as file:
async with aiofiles.open(self.file_path, mode="r", encoding="utf-8") as file:
async for line in file:
data.append(json.loads(line))
@ -47,13 +48,13 @@ class BaseBenchmark(ABC):
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": extracted_output
"extracted_output": extracted_output,
}
log_file = os.path.join(self.log_path, 'log.json')
log_file = os.path.join(self.log_path, "log.json")
if os.path.exists(log_file):
with open(log_file, 'r', encoding='utf-8') as f:
with open(log_file, "r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
@ -63,7 +64,7 @@ class BaseBenchmark(ABC):
data.append(log_data)
with open(log_file, 'w', encoding='utf-8') as f:
with open(log_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
@abstractmethod
@ -97,5 +98,3 @@ class BaseBenchmark(ABC):
print(f"Average score on {self.name} dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost

View file

@ -1,19 +1,13 @@
import os
import re
import json
import asyncio
import string
from collections import Counter
from datetime import datetime
from typing import Any, Callable, Dict, List, Tuple
from typing import Callable, List, Tuple
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class DROPBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
@ -53,12 +47,7 @@ class DROPBenchmark(BaseBenchmark):
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)

View file

@ -3,22 +3,13 @@
# @Author : all
# @Desc : test on gsm8k
import re
import json
import asyncio
import aiofiles
import pandas as pd
from typing import Optional, List, Tuple, Callable, Any
from pandas import Series
from tqdm.asyncio import tqdm_asyncio
import os
import time
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from typing import Callable, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class GSM8KBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
@ -38,13 +29,8 @@ class GSM8KBenchmark(BaseBenchmark):
if prediction is None:
return 0.0, prediction
return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)

View file

@ -1,16 +1,13 @@
import json
import asyncio
import aiofiles
import pandas as pd
from typing import List, Tuple, Callable, Any
import string
import re
import os
import string
from collections import Counter
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from typing import Callable, List, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class HotpotQABenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
@ -43,12 +40,7 @@ class HotpotQABenchmark(BaseBenchmark):
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
@ -63,7 +55,9 @@ class HotpotQABenchmark(BaseBenchmark):
output, cost = await self._generate_output(graph, inputs)
score, extracted_output = self.calculate_score(expected_output, output)
if score < 0.3: # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1
if (
score < 0.3
): # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1
self.log_mismatch(input_text, expected_output, output, extracted_output)
return input_text, context_str, output, expected_output, score, cost

View file

@ -1,17 +1,13 @@
import os
import time
import json
import asyncio
import threading
from datetime import datetime
from typing import List, Tuple, Callable, Dict, Any, Optional
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
import pandas as pd
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.actions.code_sanitize import sanitize
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class HumanEvalBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
@ -52,72 +48,86 @@ class HumanEvalBenchmark(BaseBenchmark):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
"math": __import__("math"),
"hashlib": __import__("hashlib"),
"re": __import__("re"),
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Optional": Optional,
"Any": Any,
}
# Add handling for special cases
if entry_point == "decode_cyclic":
solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution
solution = (
'\n\ndef encode_cyclic(s: str):\n """\n returns encoded string by cycling groups of three characters.\n """\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return "".join(groups)'
+ "\n\n"
+ solution
)
elif entry_point == "decode_shift":
solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution
solution = (
'\n\ndef encode_shift(s: str):\n """\n returns encoded string by shifting every character by 5 in the alphabet.\n """\n return "".join([chr(((ord(ch) + 5 - ord("a")) % 26) + ord("a")) for ch in s])\n\n\n'
+ solution
)
elif entry_point == "find_zero":
solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution
solution = (
"\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n"
+ solution
)
exec(solution, global_dict)
if entry_point not in global_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
exec(test, global_dict)
check = global_dict["check"]
result = self.run_with_timeout(check, (global_dict[entry_point],), 15)
if result is None:
result = (self.PASS, "The solution passed all test cases.")
except self.TimeoutError:
result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.")
result = (
self.FAIL,
"Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.",
)
except Exception as e:
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
result = (self.FAIL, error_message)
with open('error.log', 'a', encoding='utf-8') as log_file:
with open("error.log", "a", encoding="utf-8") as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, prompt, entry_point):
# Generate output with a timeout of 60 seconds
return await asyncio.wait_for(graph(prompt, entry_point), timeout=60)
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = data["prompt"]
expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
expected_output = (
"\nCorrect Solution:\ndef "
+ data["entry_point"]
+ "(params you should put here):"
+ "\n\n"
+ data["canonical_solution"]
)
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + expected_output
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0

View file

@ -1,16 +1,16 @@
import re
from math import isclose
from typing import Any, Callable, List, Tuple
import regex
from sympy import N, simplify
from sympy.parsing.latex import parse_latex
from sympy.parsing.sympy_parser import parse_expr
from math import isclose
import multiprocessing
from typing import Any, Callable, Tuple, List
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class MATHBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
@ -21,7 +21,7 @@ class MATHBenchmark(BaseBenchmark):
if boxed_matches:
return boxed_matches[-1].strip()
sentence_end_pattern = r'(?<!\d)[.!?]\s+'
sentence_end_pattern = r"(?<!\d)[.!?]\s+"
sentences = re.split(sentence_end_pattern, text)
sentences = [s.strip() for s in sentences if s.strip()]
return sentences[-1] if sentences else ""
@ -97,17 +97,10 @@ class MATHBenchmark(BaseBenchmark):
pass
return False
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, int, float]:
input_text = problem["problem"]
expected_output = problem["solution"]
@ -125,6 +118,5 @@ class MATHBenchmark(BaseBenchmark):
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -1,15 +1,13 @@
import os
import json
import time
import asyncio
import threading
from datetime import datetime
from typing import List, Tuple, Callable, Any, Optional, Dict
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import time
from typing import Any, Callable, Dict, List, Optional, Tuple
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from metagpt.actions.code_sanitize import sanitize
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
class MBPPBenchmark(BaseBenchmark):
def __init__(self, name: str, file_path: str, log_path: str):
super().__init__(name, file_path, log_path)
@ -49,47 +47,45 @@ class MBPPBenchmark(BaseBenchmark):
solution = sanitize(code=solution, entrypoint=entry_point)
try:
global_dict = {
'math': __import__('math'),
'hashlib': __import__('hashlib'),
're': __import__('re'),
'List': List,
'Dict': Dict,
'Tuple': Tuple,
'Optional': Optional,
'Any': Any
"math": __import__("math"),
"hashlib": __import__("hashlib"),
"re": __import__("re"),
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Optional": Optional,
"Any": Any,
}
exec(solution, global_dict)
if entry_point not in global_dict:
raise ValueError(f"Function {entry_point} is not defined in the solution.")
exec(test, global_dict)
check = global_dict["check"]
result = self.run_with_timeout(check, 15)
if result is None:
result = (self.PASS, "The solution passed all test cases.")
except self.TimeoutError:
result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.")
result = (
self.FAIL,
"Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.",
)
except Exception as e:
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
result = (self.FAIL, error_message)
with open('error.log', 'a', encoding='utf-8') as log_file:
with open("error.log", "a", encoding="utf-8") as log_file:
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
return result
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True)
async def _generate_output(self, graph, prompt, entry_point):
return await graph(prompt, entry_point)
@ -100,12 +96,12 @@ class MBPPBenchmark(BaseBenchmark):
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:" + data["code"]
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0

View file

@ -1,7 +1,9 @@
import os
import json
import os
import numpy as np
def generate_random_indices(n, n_samples, test=False):
"""
生成随机索引
@ -18,36 +20,39 @@ def generate_random_indices(n, n_samples, test=False):
else:
return indices[:n_samples]
def split_data_set(file_path, samples, test=False):
data = []
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples, test)
data = [data[i] for i in random_indices]
return data
# save data into a jsonl file
def save_data(data, file_path):
with open(file_path, 'w') as file:
with open(file_path, "w") as file:
for d in data:
file.write(json.dumps(d) + '\n')
file.write(json.dumps(d) + "\n")
def log_mismatch(problem, expected_output, prediction, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
"extracted_output": predicted_number,
}
log_file = os.path.join(path, 'log.json')
log_file = os.path.join(path, "log.json")
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
with open(log_file, "r", encoding="utf-8") as f:
try:
data = json.load(f)
except json.JSONDecodeError:
@ -60,5 +65,5 @@ def log_mismatch(problem, expected_output, prediction, predicted_number, path):
data.append(log_data)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
with open(log_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)

View file

@ -4,64 +4,66 @@
# @Desc : Download and extract dataset files
import os
import requests
import tarfile
from typing import Dict
import requests
from tqdm import tqdm
from typing import List, Dict
def download_file(url: str, filename: str) -> None:
"""Download a file from the given URL and show progress."""
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
total_size = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True)
with open(filename, 'wb') as file:
progress_bar = tqdm(total=total_size, unit="iB", unit_scale=True)
with open(filename, "wb") as file:
for data in response.iter_content(block_size):
size = file.write(data)
progress_bar.update(size)
progress_bar.close()
def extract_tar_gz(filename: str, extract_path: str) -> None:
"""Extract a tar.gz file to the specified path."""
with tarfile.open(filename, 'r:gz') as tar:
with tarfile.open(filename, "r:gz") as tar:
tar.extractall(path=extract_path)
def process_dataset(url: str, filename: str, extract_path: str) -> None:
"""Download, extract, and clean up a dataset."""
print(f"Downloading {filename}...")
download_file(url, filename)
print(f"Extracting {filename}...")
extract_tar_gz(filename, extract_path)
print(f"{filename} download and extraction completed.")
os.remove(filename)
print(f"Removed {filename}")
# Define the datasets to be downloaded
# Users can modify this list to choose which datasets to download
datasets_to_download: Dict[str, Dict[str, str]] = {
"datasets": {
"url": "https://drive.google.com/uc?export=download&id=1DNoegtZiUhWtvkd2xoIuElmIi4ah7k8e",
"filename": "aflow_data.tar.gz",
"extract_path": "metagpt/ext/aflow/data"
},
"results": {
"url": "", # 请填入正确的URL
"filename": "result.tar.gz",
"extract_path": "metagpt/ext/aflow/data/results"
"extract_path": "metagpt/ext/aflow/data",
},
"results": {"url": "", "filename": "result.tar.gz", "extract_path": "metagpt/ext/aflow/data/results"}, # 请填入正确的URL
"initial_rounds": {
"url": "https://drive.google.com/uc?export=download&id=1UBoW4WBWjX2gs4I_jq3ALdXeLdwDJMdP",
"filename": "initial_rounds.tar.gz",
"extract_path": "metagpt/ext/aflow/scripts/optimized"
}
"extract_path": "metagpt/ext/aflow/scripts/optimized",
},
}
def download(datasets):
"""Main function to process all selected datasets."""
for dataset_name in datasets:
dataset = datasets_to_download[dataset_name]
process_dataset(dataset['url'], dataset['filename'], dataset['extract_path'])
process_dataset(dataset["url"], dataset["filename"], dataset["extract_path"])

View file

@ -3,20 +3,20 @@
# @Author : all
# @Desc : Evaluation for different datasets
from typing import Literal, Tuple, Optional, Dict
import asyncio
from typing import Dict, Literal, Tuple
from metagpt.ext.aflow.benchmark.benchmark import BaseBenchmark
from metagpt.ext.aflow.benchmark.gsm8k import GSM8KBenchmark
from metagpt.ext.aflow.benchmark.math import MATHBenchmark
from metagpt.ext.aflow.benchmark.humaneval import HumanEvalBenchmark
from metagpt.ext.aflow.benchmark.hotpotqa import HotpotQABenchmark
from metagpt.ext.aflow.benchmark.mbpp import MBPPBenchmark
from metagpt.ext.aflow.benchmark.drop import DROPBenchmark
from metagpt.ext.aflow.benchmark.gsm8k import GSM8KBenchmark
from metagpt.ext.aflow.benchmark.hotpotqa import HotpotQABenchmark
from metagpt.ext.aflow.benchmark.humaneval import HumanEvalBenchmark
from metagpt.ext.aflow.benchmark.math import MATHBenchmark
from metagpt.ext.aflow.benchmark.mbpp import MBPPBenchmark
# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Evaluator:
"""
Complete the evaluation for different datasets here
@ -33,7 +33,9 @@ class Evaluator:
"DROP": DROPBenchmark,
}
async def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False) -> Tuple[float, float, float]:
async def graph_evaluate(
self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False
) -> Tuple[float, float, float]:
if dataset not in self.dataset_configs:
raise ValueError(f"Unsupported dataset: {dataset}")
@ -44,7 +46,7 @@ class Evaluator:
# Use params to configure the graph and benchmark
configured_graph = await self._configure_graph(dataset, graph, params)
va_list = [1,2,3] # Use va_list from params, or use default value if not provided
va_list = [1, 2, 3] # Use va_list from params, or use default value if not provided
return await benchmark.run_evaluation(configured_graph, va_list)
async def _configure_graph(self, dataset, graph, params: dict):

View file

@ -2,41 +2,42 @@
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of aflow
import asyncio
import concurrent.futures
import random
import sys
import asyncio
import traceback
from collections import Counter
from typing import Dict, List, Tuple
import concurrent.futures
from tenacity import retry, stop_after_attempt, wait_fixed
from metagpt.ext.aflow.scripts.utils import extract_test_cases_from_jsonl
from metagpt.actions.action_node import ActionNode
from metagpt.ext.aflow.scripts.operator_an import (
AnswerGenerateOp,
CodeGenerateOp,
FormatOp,
GenerateOp,
CodeGenerateOp,
AnswerGenerateOp,
ScEnsembleOp,
ReflectionTestOp,
MdEnsembleOp,
ReflectionTestOp,
ReviewOp,
ReviseOp,
ScEnsembleOp,
)
from metagpt.ext.aflow.scripts.prompts.prompt import (
FORMAT_PROMPT,
ANSWER_GENERATION_PROMPT,
SC_ENSEMBLE_PROMPT,
FORMAT_PROMPT,
MD_ENSEMBLE_PROMPT,
PYTHON_CODE_VERIFIER_PROMPT,
REFLECTION_ON_PUBLIC_TEST_PROMPT,
MD_ENSEMBLE_PROMPT,
REVIEW_PROMPT,
REVISE_PROMPT,
SC_ENSEMBLE_PROMPT,
)
from metagpt.ext.aflow.scripts.utils import (
extract_test_cases_from_jsonl,
test_case_2_test_function,
)
from metagpt.ext.aflow.scripts.utils import test_case_2_test_function
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
@ -66,7 +67,8 @@ class Custom(Operator):
prompt = instruction + input
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(llm, name)
@ -76,6 +78,7 @@ class AnswerGenerate(Operator):
response = await self._fill_node(AnswerGenerateOp, prompt, mode="xml_fill")
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(llm, name)
@ -112,15 +115,27 @@ class ScEnsemble(Operator):
return {"response": solutions[answer_mapping[answer]]}
def run_code(code):
try:
# Create a new global namespace
global_namespace = {}
disallowed_imports = [
"os", "sys", "subprocess", "multiprocessing",
"matplotlib", "seaborn", "plotly", "bokeh", "ggplot",
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
"os",
"sys",
"subprocess",
"multiprocessing",
"matplotlib",
"seaborn",
"plotly",
"bokeh",
"ggplot",
"pylab",
"tkinter",
"PyQt5",
"wx",
"pyglet",
]
# Check for prohibited imports
@ -132,8 +147,8 @@ def run_code(code):
# Use exec to execute the code
exec(code, global_namespace)
# Assume the code defines a function named 'solve'
if 'solve' in global_namespace and callable(global_namespace['solve']):
result = global_namespace['solve']()
if "solve" in global_namespace and callable(global_namespace["solve"]):
result = global_namespace["solve"]()
return "Success", str(result)
else:
return "Error", "Function 'solve' not found"
@ -141,7 +156,7 @@ def run_code(code):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"
class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
@ -170,11 +185,7 @@ class Programmer(Operator):
"""
Asynchronous method to generate code.
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
problem=problem,
analysis=analysis,
feedback=feedback
)
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(problem=problem, analysis=analysis, feedback=feedback)
response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve")
return response
@ -208,9 +219,8 @@ class Test(Operator):
super().__init__(llm, name)
def exec_code(self, solution, entry_point):
test_cases = extract_test_cases_from_jsonl(entry_point)
fail_cases = []
for test_case in test_cases:
test_code = test_case_2_test_function(solution, test_case, entry_point)
@ -239,9 +249,7 @@ class Test(Operator):
else:
return "no error"
async def __call__(
self, problem, solution, entry_point, test_loop: int = 3
):
async def __call__(self, problem, solution, entry_point, test_loop: int = 3):
"""
"Test": {
"description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information",
@ -271,13 +279,13 @@ class Test(Operator):
)
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
else:
return {"result": False, "solution": solution}
class Format(Operator):
def __init__(self, llm: LLM, name: str = "Format"):
@ -286,7 +294,7 @@ class Format(Operator):
async def __call__(self, problem, solution, mode: str = None):
prompt = FORMAT_PROMPT.format(problem_description=problem, solution=solution)
response = await self._fill_node(FormatOp, prompt, mode)
return response
return response
class Review(Operator):
@ -298,6 +306,7 @@ class Review(Operator):
response = await self._fill_node(ReviewOp, prompt, mode="xml_fill")
return response
class Revise(Operator):
def __init__(self, llm: LLM, name: str = "Revise"):
super().__init__(llm, name)
@ -305,7 +314,7 @@ class Revise(Operator):
async def __call__(self, problem, solution, feedback, mode: str = None):
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
response = await self._fill_node(ReviseOp, prompt, mode="xml_fill")
return response
return response
class MdEnsemble(Operator):
@ -348,4 +357,4 @@ class MdEnsemble(Operator):
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
final_answer = solutions[most_frequent_index]
return {"solution": final_answer}
return {"solution": final_answer}

View file

@ -9,31 +9,46 @@ from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class CodeGenerateOp(BaseModel):
code: str = Field(default="", description="Your complete code solution for this problem")
class AnswerGenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")
class FormatOp(BaseModel):
solution: str = Field(default="", description="Your formatted answer for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ReflectionTestOp(BaseModel):
reflection_and_solution: str = Field(default="", description="Corrective solution for code execution errors or test case failures")
reflection_and_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)
class MdEnsembleOp(BaseModel):
thought: str = Field(default="", description="Step-by-step analysis of the solutions to determine the best one.")
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
class ReviewOp(BaseModel):
review_result: bool = Field(default=False, description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'")
feedback: str = Field(default="",description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.")
review_result: bool = Field(
default=False,
description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'",
)
feedback: str = Field(
default="",
description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.",
)
class ReviseOp(BaseModel):
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")

View file

@ -10,13 +10,13 @@ from typing import List, Literal
from pydantic import BaseModel, Field
from metagpt.actions.action_node import ActionNode
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.logs import logger
from metagpt.ext.aflow.scripts.optimizer_utils.graph_utils import GraphUtils
from metagpt.ext.aflow.scripts.optimizer_utils.data_utils import DataUtils
from metagpt.ext.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils
from metagpt.ext.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils
from metagpt.ext.aflow.scripts.optimizer_utils.convergence_utils import ConvergenceUtils
from metagpt.ext.aflow.scripts.optimizer_utils.data_utils import DataUtils
from metagpt.ext.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils
from metagpt.ext.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils
from metagpt.ext.aflow.scripts.optimizer_utils.graph_utils import GraphUtils
from metagpt.logs import logger
from metagpt.provider.llm_provider_registry import create_llm_instance
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
QuestionType = Literal["math", "code", "qa"]
@ -31,17 +31,17 @@ class GraphOptimize(BaseModel):
class Optimizer:
def __init__(
self,
dataset: DatasetType,
question_type: QuestionType,
opt_llm_config,
exec_llm_config,
operators: List,
sample: int,
check_convergence: bool = False,
optimized_path: str = None,
initial_round: int = 1,
max_rounds: int = 20
self,
dataset: DatasetType,
question_type: QuestionType,
opt_llm_config,
exec_llm_config,
operators: List,
sample: int,
check_convergence: bool = False,
optimized_path: str = None,
initial_round: int = 1,
max_rounds: int = 20,
) -> None:
self.optimize_llm_config = opt_llm_config
self.optimize_llm = create_llm_instance(self.optimize_llm_config)
@ -68,8 +68,8 @@ class Optimizer:
def optimize(self, mode: OptimizerType = "Graph"):
if mode == "Test":
test_n = 3 # validation datasets's execution number
for i in range(test_n):
test_n = 3 # validation datasets's execution number
for i in range(test_n):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
score = loop.run_until_complete(self.test())
@ -99,15 +99,16 @@ class Optimizer:
if retry_count < max_retries:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.round += 1
logger.info(f"Score for round {self.round}: {score}")
converged, convergence_round, final_round = self.convergence_utils.check_convergence(top_k=3)
if converged and self.check_convergence:
logger.info(f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}")
logger.info(
f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}"
)
# Print average scores and standard deviations for each round
self.convergence_utils.print_results()
break
@ -115,7 +116,7 @@ class Optimizer:
time.sleep(5)
async def _optimize_graph(self):
validation_n = 2 # validation datasets's execution number
validation_n = 2 # validation datasets's execution number
graph_path = f"{self.root_path}/workflows"
data = self.data_utils.load_results(graph_path)
@ -152,8 +153,9 @@ class Optimizer:
response = await self.graph_utils.get_graph_optimize_response(graph_optimize_node)
# Check if the modification meets the conditions
check = self.experience_utils.check_modification(processed_experience, response["modification"],
sample["round"])
check = self.experience_utils.check_modification(
processed_experience, response["modification"], sample["round"]
)
# If `check` is True, break the loop; otherwise, regenerate the graph
if check:
@ -175,7 +177,7 @@ class Optimizer:
return avg_score
async def test(self):
rounds = [5] # You can choose the rounds you want to test here.
rounds = [5] # You can choose the rounds you want to test here.
data = []
graph_path = f"{self.root_path}/workflows_test"
@ -187,11 +189,9 @@ class Optimizer:
directory = self.graph_utils.create_round_directory(graph_path, round)
self.graph = self.graph_utils.load_graph(round, graph_path)
score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test(
self, directory, is_test=True
)
score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test(self, directory, is_test=True)
new_data = self.data_utils.create_result_data(round, score, avg_cost, total_cost)
data.append(new_data)
self.data_utils.save_results(json_file_path, data)
self.data_utils.save_results(json_file_path, data)

View file

@ -1,13 +1,16 @@
# -*- coding: utf-8 -*-
# @Date : 9/23/2024 10:00 AM
# @Author : Issac
# @Desc :
# @Desc :
import numpy as np
import json
import os
import numpy as np
from metagpt.logs import logger
class ConvergenceUtils:
def __init__(self, root_path):
self.root_path = root_path
@ -27,11 +30,11 @@ class ConvergenceUtils:
# If file doesn't exist, create a new one with an empty list
if not os.path.exists(result_file):
with open(result_file, 'w') as file:
with open(result_file, "w") as file:
json.dump([], file)
# Read file and return data
with open(result_file, 'r') as file:
with open(result_file, "r") as file:
return json.load(file)
def process_rounds(self):
@ -41,8 +44,8 @@ class ConvergenceUtils:
self.data = self.load_data(root_path=self.root_path)
rounds = {}
for entry in self.data:
round_number = entry['round']
score = entry['score']
round_number = entry["round"]
score = entry["score"]
if round_number not in rounds:
rounds[round_number] = []
rounds[round_number].append(score)
@ -77,19 +80,23 @@ class ConvergenceUtils:
sigma_Y_previous = None # Standard error of Y value from previous round
for i in range(len(self.avg_scores)):
# Dynamically select top_k from current round and all previous rounds
top_k_indices = np.argsort(self.avg_scores[:i + 1])[::-1][:top_k] # Select top k indices by descending average score
top_k_indices = np.argsort(self.avg_scores[: i + 1])[::-1][
:top_k
] # Select top k indices by descending average score
top_k_scores = [self.avg_scores[j] for j in top_k_indices] # Get list of top k scores
top_k_stds = [self.stds[j] for j in top_k_indices] # Get list of standard deviations corresponding to top k scores
top_k_stds = [
self.stds[j] for j in top_k_indices
] # Get list of standard deviations corresponding to top k scores
# Calculate mean of top k scores for current round, i.e., Y_current
Y_current = np.mean(top_k_scores)
# Calculate standard error of Y_current (sigma_Y_current), representing score dispersion
sigma_Y_current = np.sqrt(np.sum([s ** 2 for s in top_k_stds]) / (top_k ** 2))
sigma_Y_current = np.sqrt(np.sum([s**2 for s in top_k_stds]) / (top_k**2))
# If not the first round, calculate change in Y (Delta_Y) and corresponding standard error
if previous_Y is not None:
# Calculate Y difference between current round and previous round
Delta_Y = Y_current - previous_Y
# Calculate standard error of Y difference (sigma_Delta_Y)
sigma_Delta_Y = np.sqrt(sigma_Y_current ** 2 + sigma_Y_previous ** 2)
sigma_Delta_Y = np.sqrt(sigma_Y_current**2 + sigma_Y_previous**2)
# Check if Y change is within acceptable confidence interval, i.e., convergence condition
if abs(Delta_Y) <= z * sigma_Delta_Y:
convergence_count += 1
@ -105,7 +112,6 @@ class ConvergenceUtils:
# If convergence condition not met, return not converged
return False, None, None
def print_results(self):
"""
Print average score and standard deviation for all rounds.
@ -114,8 +120,8 @@ class ConvergenceUtils:
for i, (avg_score, std) in enumerate(zip(self.avg_scores, self.stds), 1):
logger.info(f"Round {i}: Average Score = {avg_score:.4f}, Standard Deviation = {std:.4f}")
if __name__ == "__main__":
if __name__ == "__main__":
# Use this class and specify top_k
checker = ConvergenceUtils("path") # For example, set top_k=5
converged, convergence_round, final_round = checker.check_convergence()

View file

@ -1,9 +1,11 @@
import datetime
import json
import os
import random
import datetime
import numpy as np
import pandas as pd
from metagpt.logs import logger
@ -15,7 +17,7 @@ class DataUtils:
def load_results(self, path: str) -> list:
result_path = os.path.join(path, "results.json")
if os.path.exists(result_path):
with open(result_path, 'r') as json_file:
with open(result_path, "r") as json_file:
try:
return json.load(json_file)
except json.JSONDecodeError:
@ -94,7 +96,7 @@ class DataUtils:
if not os.path.exists(log_dir):
return "" # 如果文件不存在,返回空字符串
logger.info(log_dir)
with open(log_dir, 'r', encoding='utf-8') as f:
with open(log_dir, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
@ -119,16 +121,10 @@ class DataUtils:
def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict:
now = datetime.datetime.now()
return {
"round": round,
"score": score,
"avg_cost": avg_cost,
"total_cost": total_cost,
"time": now
}
return {"round": round, "score": score, "avg_cost": avg_cost, "total_cost": total_cost, "time": now}
def save_results(self, json_file_path: str, data: list):
with open(json_file_path, 'w') as json_file:
with open(json_file_path, "w") as json_file:
json.dump(data, json_file, default=str, indent=4)
def _load_scores(self, path=None, mode="Graph"):
@ -140,17 +136,14 @@ class DataUtils:
result_file = os.path.join(rounds_dir, "results.json")
self.top_scores = []
with open(result_file, 'r', encoding='utf-8') as file:
with open(result_file, "r", encoding="utf-8") as file:
data = json.load(file)
df = pd.DataFrame(data)
scores_per_round = df.groupby('round')['score'].mean().to_dict()
scores_per_round = df.groupby("round")["score"].mean().to_dict()
for round_number, average_score in scores_per_round.items():
self.top_scores.append({
"round": round_number,
"score": average_score
})
self.top_scores.append({"round": round_number, "score": average_score})
self.top_scores.sort(key=lambda x: x["score"], reverse=True)

View file

@ -12,9 +12,11 @@ class EvaluationUtils:
for i in range(validation_n):
score, avg_cost, total_cost = await evaluator.graph_evaluate(
optimizer.dataset, optimizer.graph,
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory, is_test=False
directory,
is_test=False,
)
new_data = optimizer.data_utils.create_result_data(optimizer.round, score, avg_cost, total_cost)
@ -31,9 +33,11 @@ class EvaluationUtils:
for i in range(validation_n):
score, avg_cost, total_cost = await evaluator.graph_evaluate(
optimizer.dataset, optimizer.graph,
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory, is_test=False
directory,
is_test=False,
)
cur_round = optimizer.round + 1 if initial is False else optimizer.round
@ -51,7 +55,9 @@ class EvaluationUtils:
async def evaluate_graph_test(self, optimizer, directory, is_test=True):
evaluator = Evaluator(eval_path=directory)
return await evaluator.graph_evaluate(
optimizer.dataset, optimizer.graph,
optimizer.dataset,
optimizer.graph,
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
directory, is_test=is_test
directory,
is_test=is_test,
)

View file

@ -1,8 +1,10 @@
import json
import os
from collections import defaultdict
from metagpt.logs import logger
class ExperienceUtils:
def __init__(self, root_path: str):
self.root_path = root_path
@ -32,12 +34,12 @@ class ExperienceUtils:
if data["succeed"]:
experience_data[father_node]["success"][round_number] = {
"modification": data["modification"],
"score": data["after"]
"score": data["after"],
}
else:
experience_data[father_node]["failure"][round_number] = {
"modification": data["modification"],
"score": data["after"]
"score": data["after"],
}
except Exception as e:
logger.info(f"Error processing {round_dir}: {str(e)}")
@ -69,10 +71,10 @@ class ExperienceUtils:
experience_data = processed_experience.get(sample_round)
if experience_data:
for key, value in experience_data["failure"].items():
if value['modification'] == modification:
if value["modification"] == modification:
return False
for key, value in experience_data["success"].items():
if value['modification'] == modification:
if value["modification"] == modification:
return False
return True
else:

View file

@ -1,17 +1,17 @@
import json
import os
import re
import json
from typing import List
import traceback
import time
from metagpt.logs import logger
import traceback
from typing import List
from metagpt.ext.aflow.scripts.prompts.optimize_prompt import (
WORKFLOW_CUSTOM_USE,
WORKFLOW_INPUT,
WORKFLOW_OPTIMIZE_PROMPT,
WORKFLOW_TEMPLATE
WORKFLOW_TEMPLATE,
)
from metagpt.logs import logger
class GraphUtils:
@ -72,11 +72,24 @@ class GraphUtils:
interface = matched_data["interface"]
return f"{id}. {operator_name}: {desc}, with interface {interface})."
def create_graph_optimize_prompt(self, experience: str, score: float, graph: str, prompt: str,
operator_description: str, type: str, log_data: str) -> str:
def create_graph_optimize_prompt(
self,
experience: str,
score: float,
graph: str,
prompt: str,
operator_description: str,
type: str,
log_data: str,
) -> str:
graph_input = WORKFLOW_INPUT.format(
experience=experience, score=score, graph=graph, prompt=prompt, operator_description=operator_description,
type=type, log=log_data
experience=experience,
score=score,
graph=graph,
prompt=prompt,
operator_description=operator_description,
type=type,
log=log_data,
)
graph_system = WORKFLOW_OPTIMIZE_PROMPT.format(type=type)
return graph_input + WORKFLOW_CUSTOM_USE + graph_system

View file

@ -86,4 +86,4 @@ solution: {solution}
feedback: {feedback}
Ensure the output code is self-contained, and without any additional text or test cases.
"""
"""

View file

@ -8,6 +8,7 @@ import json
import re
from typing import Any, List, Tuple
def extract_task_id(task_id: str) -> int:
"""Extract the numeric part of the task_id."""
match = re.search(r"/(\d+)", task_id)
@ -52,23 +53,21 @@ def parse_python_literal(s):
return s
def extract_test_cases_from_jsonl(
entry_point: str, dataset: str = "HumanEval"
):
def extract_test_cases_from_jsonl(entry_point: str, dataset: str = "HumanEval"):
if dataset == "HumanEval":
file_path = "metagpt/ext/aflow/data/humaneval_public_test.jsonl"
# Retain the original hardcoded test cases
# Retain the original hardcoded test cases
hardcoded_cases = {
"find_zero": "",
"decode_cyclic": "",
"decode_shift": "",
"by_length":"",
"add":"",
"triangle_area":"",
"correct_bracketing":"",
"solve":"",
"sum_squares":"",
"starts_one_ends":""
"find_zero": "",
"decode_cyclic": "",
"decode_shift": "",
"by_length": "",
"add": "",
"triangle_area": "",
"correct_bracketing": "",
"solve": "",
"sum_squares": "",
"starts_one_ends": "",
}
elif dataset == "MBPP":
file_path = "metagpt/ext/aflow/data/mbpp_public_test.jsonl"
@ -80,7 +79,7 @@ def extract_test_cases_from_jsonl(
"swap_List": "",
"square_Sum": "",
"sort_sublists": "",
"unique_sublists": ""
"unique_sublists": "",
}
# Check if there are hardcoded test cases
if entry_point in hardcoded_cases:
@ -93,7 +92,7 @@ def extract_test_cases_from_jsonl(
if data.get("entry_point") == entry_point:
return data.get("test")
return None
return None
def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]:

View file

@ -5,12 +5,12 @@
from typing import Literal
from metagpt.ext.aflow.scripts.operator import Generate
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
@ -28,5 +28,3 @@ class Workflow:
Implementation of the workflow
"""
raise NotImplementedError("This method should be implemented by the subclass")