Update AFlow

This commit is contained in:
didi 2024-10-17 15:47:09 +08:00
parent cea3473002
commit 6aedc4a068
70 changed files with 1516 additions and 178 deletions

View file

@ -158,7 +158,9 @@ async def evaluate_problem(annotation: dict, graph: Callable, log_path) -> Tuple
uni_score = max_score
if uni_score == 0:
print("uni_score", uni_score)
if uni_score < 0.3:
log_mismatch(inputs, expected_output, output, output, log_path)
else:
ensure_log_file_exists(log_path)

View file

@ -2,25 +2,23 @@
# @Date :
# @Author : all
# @Desc : test on gsm8k
import re
import json
import asyncio
import aiofiles
import pandas as pd
from typing import Optional, List, Tuple, Callable, Any
from pandas import Series
from tqdm.asyncio import tqdm_asyncio
import os
import time
from datetime import datetime
from examples.aflow.benchmark.utils import generate_random_indices, log_mismatch
def extract_number(text: str) -> Optional[float]:
"""Clean text and extract a single number"""
print(f"text: {text}")
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text))
print(f"matches: {matches}")
if matches:
last_number = matches[-1].replace(",", "")
try:
@ -30,55 +28,116 @@ def extract_number(text: str) -> Optional[float]:
else:
return None
def loose_match_score(expected_output: float, prediction: float, tolerance: float = 1e-6) -> int:
# 如果预测输出为空,返回不匹配
if prediction is None:
return 0
if abs(expected_output - prediction) <= tolerance:
return 1
a = expected_output
b = prediction
# 比较两个提取出的数字,允许一定的容差
if abs(a - b) <= tolerance:
return 1 # 数字相近,认为匹配成功
else:
return 0
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> Tuple[float, float, float]:
return 0 # 数字不匹配
def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number: float, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
# 添加新的日志记录
data.append(log_data)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return avg_score, a_cost, t_cost
async def evaluate_problem(input: str, graph: Callable, expected_output: str, path: str = None) -> Tuple[str, str, str, int, str]:
max_retries = 10
async def evaluate_problem(input: str, graph, expected_output: str, path) -> tuple[
str, str , float, int, str]:
max_retries = 5
retries = 0
uni_score = 0
while retries < max_retries:
try:
prediction = await graph(input) if graph else None
prediction = await graph(input) if graph else "None" # 这是一个占位符,替换成实际的模型生成逻辑
cost = prediction[1]
output = prediction[0]
if output is not None:
predicted_number = extract_number(output)
expected_number = extract_number(expected_output)
expected_output = extract_number(expected_output)
else:
predicted_number = None
expected_number = extract_number(expected_output)
print(f"predicted_number: {predicted_number}, expected_number: {expected_number}")
uni_score = loose_match_score(expected_number, predicted_number)
uni_score = loose_match_score(expected_output, predicted_number)
if uni_score == 0 and path is not None:
if uni_score == 0:
log_mismatch(input, expected_output, output, predicted_number, path)
else:
pass
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
@ -86,14 +145,15 @@ async def evaluate_problem(input: str, graph: Callable, expected_output: str, pa
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = str(e)
output = e
cost = None
uni_score = 0
break
return input, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_concurrent_tasks: int = 20) -> List[Tuple[str, str, str, int, str]]:
async def evaluate_all_problems(data: List[dict], graph, path, max_concurrent_tasks: int = 100):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
@ -104,41 +164,15 @@ async def evaluate_all_problems(data: List[dict], graph: Callable, path, max_con
tasks = [sem_evaluate(problem) for problem in data]
# 使用tqdm.gather来显示进度条
return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data))
async def load_data(file_path: str, samples=1, test=False) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples, test=test)
data = [data[i] for i in random_indices]
return data
async def load_file_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
async def gsm8k_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]:
data = await load_data(file_path, samples, test=test)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=20)
async def optimize_gsm8k_evaluation(graph: Callable, file_path: str, path: str, va_list: list) -> tuple[
Any, Any, Any]:
"""Optimize GSM8K evaluation main function"""
data = await load_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=30)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
async def optimize_gsm8k_evaluation(graph: Callable, file_path: str, path: str, va_list: list) -> Tuple[Any, Any, Any]:
data = await load_file_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=8)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, average_cost, total_cost
return average_score, average_cost, total_cost

View file

@ -172,7 +172,7 @@ async def evaluate_problem_optimize(problem: dict, graph: Callable, log_path: st
output, cost = await graph(inputs) if graph else "None"
uni_score, extracted_output = calculate_score(expected_output, output)
if uni_score == 0:
if uni_score < 0.3:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)

View file

@ -1,5 +1,6 @@
import re
import regex
from pandas import Series
from sympy import N, simplify
from sympy.parsing.latex import parse_latex
from sympy.parsing.sympy_parser import parse_expr
@ -9,10 +10,12 @@ import json
import asyncio
import aiofiles
import pandas as pd
from typing import Optional, List, Tuple, Callable, Union
from typing import Optional, List, Tuple, Callable, Union, Any
from tqdm.asyncio import tqdm_asyncio
from datetime import datetime
import os
import inspect
from examples.aflow.benchmark.utils import generate_random_indices
def extract_model_answer(text: str) -> str:
# 提取最后一个 \boxed{...}
@ -27,6 +30,26 @@ def extract_model_answer(text: str) -> str:
sentences = [s.strip() for s in sentences if s.strip()]
return sentences[-1] if sentences else ""
def extract_answer(text: str) -> str:
# Look for the answer within \boxed{...}
boxed_match = re.search(r"\\boxed{(.*?)}", text)
if boxed_match:
return boxed_match.group(1).strip()
sentence_end_pattern = r'(?<!\d)[.!?]\s+'
sentences = re.split(sentence_end_pattern, text)
# 过滤空字符串并返回最后一个非空句子
sentences = [s.strip() for s in sentences if s.strip()]
return sentences[-1] if sentences else ""
def get_function_code(func):
try:
source_code = inspect.getsource(func)
return source_code
except OSError:
return "no code"
def parse_digits(num):
# format: 234.23 || 23%
num = regex.sub(",", "", str(num))
@ -43,10 +66,12 @@ def parse_digits(num):
pass
return None
def is_digit(num):
# paired with parse_digits
return parse_digits(num) is not None
def symbolic_equal(a, b):
def _parse(s):
for f in [parse_latex, parse_expr]:
@ -72,6 +97,7 @@ def symbolic_equal(a, b):
pass
return False
def call_with_timeout(func, *args, timeout=5, **kwargs):
output_queue = multiprocessing.Queue()
process_args = args + (output_queue,)
@ -86,12 +112,13 @@ def call_with_timeout(func, *args, timeout=5, **kwargs):
return output_queue.get()
def math_equal(
prediction: Union[bool, float, str],
reference: Union[float, str],
include_percentage: bool = True,
is_close: bool = True,
timeout: bool = False,
prediction: Union[bool, float, str],
reference: Union[float, str],
include_percentage: bool = True,
is_close: bool = True,
timeout: bool = False,
) -> bool:
"""
Exact match of math if and only if:
@ -132,34 +159,34 @@ def math_equal(
prediction = str(prediction).strip()
if (
regex.match(r"(\(|\[).+(\)|\])", prediction) is not None
and regex.match(r"(\(|\[).+(\)|\])", reference) is not None
regex.match(r"(\(|\[).+(\)|\])", prediction) is not None
and regex.match(r"(\(|\[).+(\)|\])", reference) is not None
):
pred_parts = prediction[1:-1].split(",")
ref_parts = reference[1:-1].split(",")
if len(pred_parts) == len(ref_parts):
if all(
[
math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close)
for i in range(len(pred_parts))
]
[
math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close)
for i in range(len(pred_parts))
]
):
return True
if (
(prediction.startswith("\\begin{pmatrix}") or prediction.startswith("\\begin{bmatrix}"))
and (prediction.endswith("\\end{pmatrix}") or prediction.endswith("\\end{bmatrix}"))
and (reference.startswith("\\begin{pmatrix}") or reference.startswith("\\begin{bmatrix}"))
and (reference.endswith("\\end{pmatrix}") or reference.endswith("\\end{bmatrix}"))
(prediction.startswith("\\begin{pmatrix}") or prediction.startswith("\\begin{bmatrix}"))
and (prediction.endswith("\\end{pmatrix}") or prediction.endswith("\\end{bmatrix}"))
and (reference.startswith("\\begin{pmatrix}") or reference.startswith("\\begin{bmatrix}"))
and (reference.endswith("\\end{pmatrix}") or reference.endswith("\\end{bmatrix}"))
):
pred_lines = [
line.strip()
for line in prediction[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\")
for line in prediction[len("\\begin{pmatrix}"): -len("\\end{pmatrix}")].split("\\\\")
if line.strip()
]
ref_lines = [
line.strip()
for line in reference[len("\\begin{pmatrix}") : -len("\\end{pmatrix}")].split("\\\\")
for line in reference[len("\\begin{pmatrix}"): -len("\\end{pmatrix}")].split("\\\\")
if line.strip()
]
matched = True
@ -169,10 +196,10 @@ def math_equal(
ref_parts = ref_line.split("&")
if len(pred_parts) == len(ref_parts):
if not all(
[
math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close)
for i in range(len(pred_parts))
]
[
math_equal(pred_parts[i], ref_parts[i], include_percentage, is_close)
for i in range(len(pred_parts))
]
):
matched = False
break
@ -209,91 +236,161 @@ def math_equal(
return False
def calculate_score(expected_output: str, prediction: str) -> int:
def calculate_score(expected_output: str, prediction: str) -> tuple[int, str]:
expected_answer = extract_model_answer(expected_output)
predicted_answer = extract_model_answer(prediction)
return 1 if math_equal(predicted_answer, expected_answer) else 0
if math_equal(predicted_answer, expected_answer):
return 1, predicted_answer
else:
return 0, predicted_answer
async def load_data(file_path: str, samples: int = 200, test=False) -> List[dict]:
def ensure_log_file_exists(path: str):
log_file = os.path.join(path, 'log.json')
if not os.path.exists(log_file):
with open(log_file, 'w', encoding='utf-8') as f:
json.dump([], f, indent=4, ensure_ascii=False)
def log_mismatch(problem: str, expected_output: float, prediction: str, predicted_number, path):
log_data = {
"question": problem,
"right_answer": expected_output,
"model_output": prediction,
"extracted_output": predicted_number
}
# 获取传入函数的源代码
function_code = get_function_code(extract_model_answer)
log_data["extract_answer_code"] = function_code # 新字段
log_file = os.path.join(path, 'log.json')
# 检查log文件是否已经存在
if os.path.exists(log_file):
# 如果存在,加载现有的日志数据
with open(log_file, 'r', encoding='utf-8') as f:
try:
data = json.load(f)
except json.JSONDecodeError:
data = []
else:
# 如果不存在,创建一个新的日志列表
data = []
# 添加新的日志记录
data.append(log_data)
# 将数据写回到log.json文件
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
async def load_data(file_path: str, specific_indices: List[int] = None) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
# 异步读取文件内容
async with aiofiles.open(file_path, mode="r", encoding='utf-8') as file:
async for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples, test)
data = [data[i] for i in random_indices]
# 然后在随机选择的样本中基于特定索引列表进行进一步筛选
if specific_indices is not None:
filtered_data = [data[i] for i in specific_indices if i < len(data)]
return filtered_data
return data
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> Tuple[float, float]:
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
# 创建 DataFrame
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
average_score = df["score"].mean()
total_cost = df["cost"].max()
output_file = f"{path}/{average_score:.5f}.csv"
# 计算统计数据
avg_score = df["score"].mean()
t_cost = df["cost"].max()
a_cost = t_cost / len(df) if len(df) > 0 else 0
# 获取当前时间,格式为 YYYYMMDD_HHMMSS
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# 生成文件名,包含平均分和当前时间,保留五位小数
filename = f"{avg_score:.5f}_{current_time}.csv"
output_file = os.path.join(path, filename)
# 保存到 CSV
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score, total_cost
async def evaluate_problem(problem: dict, graph: Callable) -> Tuple[str, str, str, int, str]:
return avg_score, a_cost, t_cost
async def evaluate_problem(problem: dict, graph, log_path) -> Tuple[str, str, str, int, str]:
input_text = problem["problem"]
expected_output = problem["solution"]
max_retries = 5
max_retries = 2
retries = 0
while retries < max_retries:
try:
prediction = await graph(input_text)
cost = prediction[1]
output = prediction[0]["solution"]
score = calculate_score(expected_output, output)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
prediction = await graph(input_text) if graph else "None"
cost = prediction[1]
output = prediction[0]
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = None
cost = None
score = 0
break
uni_score, extracted_output = calculate_score(expected_output, output)
return input_text, output, expected_output, score, cost
if uni_score == 0:
log_mismatch(input_text, expected_output, output, extracted_output, log_path)
else:
ensure_log_file_exists(log_path)
async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 20) -> List[Tuple[str, str, str, int, str]]:
# while retries < max_retries:
# try:
# prediction = await graph(input_text) if graph else "None"
# cost = prediction[1]
# output = prediction[0]
# uni_score, extracted_output = calculate_score(expected_output, output)
# if uni_score == 0:
# log_mismatch(input_text, expected_output, output, extracted_output, log_path)
# else:
# ensure_log_file_exists(log_path)
# break
# except Exception as e:
# retries += 1
# print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
# if retries == max_retries:
# print("Maximum retries reached. Skipping this sample.")
# output = e
# cost = None
# uni_score = 0
# break
return input_text, output, expected_output, uni_score, cost
async def evaluate_all_problems(data: List[dict], graph, path, max_concurrent_tasks: int = 300):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
return await evaluate_problem(problem, graph)
return await evaluate_problem(problem, graph, path)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating MATH problems", total=len(data))
async def math_evaluation(graph: Callable, file_path: str, samples: int, path: str, test=False) -> Tuple[float, float]:
data = await load_data(file_path, samples, test=test)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
average_score, total_cost = save_results_to_csv(results, path=path)
async def optimize_math_evaluation(graph: Callable, file_path: str, path: str, va_list: list) -> tuple[
Any, Any, Any]:
data = await load_data(file_path, va_list)
results = await evaluate_all_problems(data, graph, path, max_concurrent_tasks=30)
average_score, average_cost, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on MATH dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
async def load_file_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data
async def optimize_math_evaluation(graph: Callable, file_path: str, path: str) -> Tuple[float, float]:
data = await load_file_data(file_path)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=50)
average_score, total_cost = save_results_to_csv(results, path=path)
print(f"Average score on MATH dataset: {average_score:.5f}")
print(f"Total Cost: {total_cost:.5f}")
return average_score, total_cost
return average_score, average_cost, total_cost

View file

@ -61,7 +61,6 @@ def run_with_timeout(func, timeout):
def check_solution(solution, test, entry_point):
solution = sanitize(code=solution, entrypoint=entry_point)
print(test)
try:
# 定义一个包含所有必要模块的全局字典
global_dict = {
@ -111,6 +110,7 @@ async def evaluate_problem(data: dict, graph: Callable, path) -> Tuple[str, str,
retries = 0
expected_output = "\nCorrect Solution:\ndef " + data["code"]
while retries < max_retries:
try:
prediction = await graph(data["prompt"], data["entry_point"]) if graph else "None"

View file

@ -0,0 +1,12 @@
models:
"<model_name>": # model: "gpt-4-turbo" # or gpt-3.5-turbo
api_type: "openai" # or azure / ollama / groq etc.
base_url: "<your base url>"
api_key: "<your api key>"
temperature: 0
"<model_name>":
api_type: "openai"
base_url: "<your base url>"
api_key: "<your api key>"
temperature: 0
CALC_USAGE: True

48
examples/aflow/readme.md Normal file
View file

@ -0,0 +1,48 @@
# AFlow: Automating Agentic Workflow Generation
AFlow is a framework for automatically generating and optimizing Agentic Workflows. It uses Monte Carlo tree search in a code-represented workflow space to find effective workflows, replacing manual development with machine effort. Our approach shows potential to outperform handcrafted workflows on various tasks.
[Read our paper on arXiv](arxiv_link_here)
[Insert performance graph/image here]
## Framework Components
- **Node**: Basic unit of LLM invocation. See `action_node.py` for a flexible interface to control LLM, temperature, format, and prompt.
- **Operator**: Predefined combinations of Nodes to enhance search efficiency.
- **Workflow**: [Brief description needed]
- **Optimizer**: [Brief description needed]
- **Evaluator**: [Brief description needed]
## Datasets
We provide implementations for [list datasets here].
Data is available at [link to data].
For custom tasks, [brief instructions or link to documentation].
## Quick Start
1. Configure your search in `optimize.py`
2. Set up parameters in `config/config2.yaml` (see `examples/aflow/config2.example.yaml` for reference)
[Add any additional setup or running instructions]
## Contributing
[Instructions for contributing, if applicable]
## License
[License information]
## Citation
If you use AFlow in your research, please cite our paper:
```
[Citation details]
```
For more information, visit our [project website/documentation].

View file

@ -56,7 +56,7 @@ class Evaluator:
va_list = None
else:
data_path = "examples/aflow/data/gsm8k_validate.jsonl" # Replace with your JSONL file path
va_list = [1] # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()
@ -78,7 +78,7 @@ class Evaluator:
va_list = None
else:
data_path = "examples/aflow/data/math_validate.jsonl"
va_list = None # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()
@ -100,7 +100,7 @@ class Evaluator:
va_list = None
else:
data_path = "examples/aflow/data/human-eval_validate.jsonl" # Replace with your JSONL file path
va_list = None # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()
@ -119,10 +119,10 @@ class Evaluator:
if test:
data_path = "examples/aflow/data/mbpp_test.jsonl"
va_list = None
va_list = None
else:
data_path = "examples/aflow/data/mbpp_validate.jsonl"
va_list = None # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()
@ -144,7 +144,7 @@ class Evaluator:
va_list = None
else:
data_path = "examples/aflow/data/hotpotqa_validate.jsonl"
va_list = None # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()
@ -167,7 +167,7 @@ class Evaluator:
va_list = None
else:
data_path = "examples/aflow/data/drop_validate.jsonl"
va_list = None # Replace with the filtered index list
va_list = [1,2,3] # Replace with the filtered index list
graph = await load_graph()

View file

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.DROP.workflows.template.operator as operator
import examples.aflow.scripts.optimized.DROP.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
solution = await self.custom(input=problem, instruction="")
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,15 @@
SC_ENSEMBLE_PROMPT = """
Several answers have been generated to a same question. They are as follows:
{solutions}
Identify the concise answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
ANSWER_GENERATION_PROMPT = """
Think step by step and solve the problem.
1. In the "thought" field, explain your thinking process in detail.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Your task: {input}
"""

View file

@ -0,0 +1,14 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str]) -> dict with key 'response' of type str"
},
"AnswerGenerate": {
"description": "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
"interface": "answer_generate(input: str) -> dict with key 'thought' of type str, 'answer' of type str"
}
}

View file

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from tenacity import retry, stop_after_attempt, wait_fixed
from examples.aflow.scripts.optimized.DROP.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.DROP.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
import re
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(name, llm)
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
fill_kwargs = {"context": prompt, "llm": self.llm}
node = await ActionNode.from_pydantic(AnswerGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}

View file

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class AnswerGenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")

View file

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.GSM8K.workflows.template.operator as operator
import examples.aflow.scripts.optimized.GSM8K.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
solution = await self.custom(input=problem, instruction="")
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.HotpotQA.workflows.template.operator as operator
import examples.aflow.scripts.optimized.HotpotQA.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
solution = await self.custom(input=problem, instruction="")
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,15 @@
SC_ENSEMBLE_PROMPT = """
Several answers have been generated to a same question. They are as follows:
{solutions}
Identify the concise answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
ANSWER_GENERATION_PROMPT = """
Think step by step and solve the problem.
1. In the "thought" field, explain your thinking process in detail.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Your task: {input}
"""

View file

@ -0,0 +1,14 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str]) -> dict with key 'response' of type str"
},
"AnswerGenerate": {
"description": "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
"interface": "answer_generate(input: str) -> dict with key 'thought' of type str, 'answer' of type str"
}
}

View file

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from tenacity import retry, stop_after_attempt, wait_fixed
from examples.aflow.scripts.optimized.HotpotQA.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.HotpotQA.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
import re
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(name, llm)
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
fill_kwargs = {"context": prompt, "llm": self.llm}
node = await ActionNode.from_pydantic(AnswerGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}

View file

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class AnswerGenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")

View file

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.HumanEval.workflows.template.operator as operator
import examples.aflow.scripts.optimized.HumanEval.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
self.custom_code_generate = operator.CustomCodeGenerate(self.llm)
async def __call__(self, problem: str, entry_point: str):
"""
Implementation of the workflow
Custom operator to generate anything you want.
But when you want to get standard code, you should use custom_code_generate operator.
"""
# await self.custom(input=, instruction="")
solution = await self.custom_code_generate(problem=problem, entry_point=entry_point, instruction="") # But When you want to get standard code ,you should use customcodegenerator.
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,27 @@
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
### problem
{problem}
### Code Solution
{solution}
### Execution Result
{exec_pass}
#### Failed Test Case
{test_fail}
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
"""

View file

@ -0,0 +1,18 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"CustomCodeGenerate": {
"description": "Generates code based on customized input and instruction.",
"interface": "custom_code_generate(problem: str, entry_point: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str], problem: str) -> dict with key 'response' of type str"
},
"Test": {
"description": "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
"interface": "test(problem: str, solution: str, entry_point: str) -> dict with key 'result' of type bool and key 'solution' of type str"
}
}

View file

@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of aflow
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from examples.aflow.scripts.optimized.HumanEval.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.HumanEval.workflows.template.op_prompt import *
from examples.aflow.scripts.utils import extract_test_cases_from_jsonl, test_case_2_test_function
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
import re
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(name, llm)
async def __call__(self, problem, entry_point, instruction):
prompt = instruction + problem
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, function_name=entry_point, mode="code_fill")
response = node.instruct_content.model_dump()
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}
class Test(Operator):
def __init__(self, llm, name: str = "Test"):
super().__init__(name, llm)
def exec_code(self, solution, entry_point):
test_cases = extract_test_cases_from_jsonl(entry_point)
fail_cases = []
for test_case in test_cases:
test_code = test_case_2_test_function(solution, test_case, entry_point)
try:
exec(test_code, globals())
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
with open("tester.txt", "a") as f:
f.write("test_error of " + entry_point + "\n")
error_infomation = {
"test_fail_case": {
"test_case": test_case,
"error_type": "AssertionError",
"error_message": str(e),
"traceback": tb_str,
}
}
fail_cases.append(error_infomation)
except Exception as e:
with open("tester.txt", "a") as f:
f.write(entry_point + " " + str(e) + "\n")
return {"exec_fail_case": str(e)}
if fail_cases != []:
return fail_cases
else:
return "no error"
async def __call__(
self, problem, solution, entry_point, test_loop: int = 3
):
"""
"Test": {
"description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information",
"interface": "test(problem: str, solution: str, entry_point: str) -> str"
}
"""
for _ in range(test_loop):
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
elif "exec_fail_case" in result:
result = result["exec_fail_case"]
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
solution = response["reflection_and_solution"]
else:
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
else:
return {"result": False, "solution": solution}

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ReflectionTestOp(BaseModel):
reflection_and_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.GSM8K.workflows.template.operator as operator
import examples.aflow.scripts.optimized.GSM8K.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
solution = await self.custom(input=problem, instruction="")
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,24 @@
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {problem}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
PYTHON_CODE_VERIFIER_PROMPT = """
You are a professional Python programmer. Your task is to write complete, self-contained code based on a given mathematical problem and output the answer. The code should include all necessary imports and dependencies, and be ready to run without additional setup or environment configuration.
Problem description: {problem}
Other analysis: {analysis}
{feedback}
Your code should:
1. Implement the calculation steps described in the problem.
2. Define a function named `solve` that performs the calculation and returns the result. The `solve` function should not require any input parameters; instead, it should obtain all necessary inputs from within the function or from globally defined variables.
3. `solve` function return the final calculation result.
Please ensure your code is efficient, well-commented, and follows Python best practices. The output should be limited to basic data types such as strings, integers, and floats. It is prohibited to transmit images or other file formats. The code output is intended for a text-based language model.
"""

View file

@ -0,0 +1,14 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str], problem: str) -> dict with key 'response' of type str"
},
"Programmer": {
"description": "Automatically writes, executes Python code, and returns the solution based on the provided problem description and analysis. The `output` only contains the final answer. If you want to see the detailed solution process, it's recommended to retrieve the `code`.",
"interface": "programmer(problem: str, analysis: str = 'None') -> dict with keys 'code' and 'output' of type str"
}
}

View file

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import concurrent
import sys
import traceback
from typing import List
from tenacity import retry, stop_after_attempt, wait_fixed
from examples.aflow.scripts.optimized.GSM8K.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.GSM8K.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
import asyncio
import logging
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
def run_code(code):
try:
# 创建一个新的全局命名空间
global_namespace = {}
disallowed_imports = [
"os", "sys", "subprocess", "multiprocessing",
"matplotlib", "seaborn", "plotly", "bokeh", "ggplot",
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
]
# 检查禁止导入的库
for lib in disallowed_imports:
if f"import {lib}" in code or f"from {lib}" in code:
logging.warning("检测到禁止导入的库: %s", lib)
return "Error", f"禁止导入的库: {lib} 以及绘图类功能"
# 使用 exec 执行代码
exec(code, global_namespace)
# 假设代码中定义了一个名为 'solve' 的函数
if 'solve' in global_namespace and callable(global_namespace['solve']):
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Error", "未找到 'solve' 函数"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"
class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
super().__init__(name, llm)
async def exec_code(self, code, timeout=30):
"""
异步执行代码并在超时时返回错误
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
# 提交 run_code 任务到进程池
future = loop.run_in_executor(executor, run_code, code)
# 等待任务完成或超时
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
# 超时,尝试关闭进程池
executor.shutdown(wait=False, cancel_futures=True)
return "Error", "代码执行超时"
except Exception as e:
return "Error", f"未知错误: {str(e)}"
async def code_generate(self, problem, analysis, feedback, mode):
"""
生成代码的异步方法
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
problem=problem,
analysis=analysis,
feedback=feedback
)
fill_kwargs = {
"context": prompt,
"llm": self.llm,
"function_name": "solve"
}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(CodeGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def __call__(self, problem: str, analysis: str = "None"):
"""
调用方法生成代码并执行最多重试 3
"""
code = None
output = None
feedback = ""
for i in range(3):
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
code = code_response.get("code")
if not code:
return {"code": code, "output": "未生成代码"}
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
else:
print(f"{i + 1}次执行错误,错误信息:{output}")
feedback = (
f"\nThe result of the error from the code you wrote in the previous round:\n"
f"Code: {code}\n\nStatus: {status}, {output}"
)
return {"code": code, "output": output}
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, problem=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class CodeGenerateOp(BaseModel):
code: str = Field(default="", description="Your complete code solution for this problem")
class ScEnsembleOp(BaseModel):
solution_letter: str = Field(default="", description="The letter of most consistent solution.")

View file

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.MBPP.workflows.template.operator as operator
import examples.aflow.scripts.optimized.MBPP.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
self.custom_code_generate = operator.CustomCodeGenerate(self.llm)
async def __call__(self, problem: str, entry_point: str):
"""
Implementation of the workflow
Custom operator to generate anything you want.
But when you want to get standard code, you should use custom_code_generate operator.
"""
# await self.custom(input=, instruction="")
solution = await self.custom_code_generate(problem=problem, entry_point=entry_point, instruction="") # But When you want to get standard code ,you should use customcodegenerator.
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -0,0 +1,6 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -0,0 +1,27 @@
SC_ENSEMBLE_PROMPT = """
Given the question described as follows: {question}
Several solutions have been generated to address the given question. They are as follows:
{solutions}
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
### problem
{problem}
### Code Solution
{solution}
### Execution Result
{exec_pass}
#### Failed Test Case
{test_fail}
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
"""

View file

@ -0,0 +1,18 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"CustomCodeGenerate": {
"description": "Generates code based on customized input and instruction.",
"interface": "custom_code_generate(problem: str, entry_point: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str], problem: str) -> dict with key 'response' of type str"
},
"Test": {
"description": "Tests the solution using public test cases. If the solution fails, it reflects on the errors and attempts to modify the solution. Returns True and the solution if all tests pass after modifications. Returns False and the current solution if it still fails after modifications.",
"interface": "test(problem: str, solution: str, entry_point: str) -> dict with key 'result' of type bool and key 'solution' of type str"
}
}

View file

@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of aflow
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from examples.aflow.scripts.optimized.HumanEval.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.HumanEval.workflows.template.op_prompt import *
from examples.aflow.scripts.utils import extract_test_cases_from_jsonl, test_case_2_test_function
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
import re
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(name, llm)
async def __call__(self, problem, entry_point, instruction):
prompt = instruction + problem
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, function_name=entry_point, mode="code_fill")
response = node.instruct_content.model_dump()
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str], problem: str):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}
class Test(Operator):
def __init__(self, llm, name: str = "Test"):
super().__init__(name, llm)
def exec_code(self, solution, entry_point):
test_cases = extract_test_cases_from_jsonl(entry_point, "MBPP")
fail_cases = []
for test_case in test_cases:
test_code = test_case_2_test_function(solution, test_case, entry_point)
try:
exec(test_code, globals())
except AssertionError as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
with open("tester.txt", "a") as f:
f.write("test_error of " + entry_point + "\n")
error_infomation = {
"test_fail_case": {
"test_case": test_case,
"error_type": "AssertionError",
"error_message": str(e),
"traceback": tb_str,
}
}
fail_cases.append(error_infomation)
except Exception as e:
with open("tester.txt", "a") as f:
f.write(entry_point + " " + str(e) + "\n")
return {"exec_fail_case": str(e)}
if fail_cases != []:
return fail_cases
else:
return "no error"
async def __call__(
self, problem, solution, entry_point, test_loop: int = 3
):
"""
"Test": {
"description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information",
"interface": "test(problem: str, solution: str, entry_point: str) -> str"
}
"""
for _ in range(test_loop):
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
elif "exec_fail_case" in result:
result = result["exec_fail_case"]
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
solution = response["reflection_and_solution"]
else:
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
problem=problem,
solution=solution,
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)
if result == "no error":
return {"result": True, "solution": solution}
else:
return {"result": False, "solution": solution}

View file

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class ReflectionTestOp(BaseModel):
reflection_and_solution: str = Field(
default="", description="Corrective solution for code execution errors or test case failures"
)

View file

@ -82,23 +82,25 @@ class Optimizer:
retry_count = 0
max_retries = 1
while retry_count < max_retries:
try:
score = loop.run_until_complete(self._optimize_graph())
break
except Exception as e:
retry_count += 1
print(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
if retry_count == max_retries:
print("Max retries reached. Moving to next round.")
score = None
score = loop.run_until_complete(self._optimize_graph())
wait_time = 5 * retry_count
time.sleep(wait_time)
# while retry_count < max_retries:
# try:
# score = loop.run_until_complete(self._optimize_graph())
# break
# except Exception as e:
# retry_count += 1
# print(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
# if retry_count == max_retries:
# print("Max retries reached. Moving to next round.")
# score = None
if retry_count < max_retries:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# wait_time = 5 * retry_count
# time.sleep(wait_time)
# if retry_count < max_retries:
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
self.round += 1
print(f"Score for round {self.round}: {score}")

View file

@ -92,7 +92,7 @@ class DataUtils:
# 检查文件是否存在
if not os.path.exists(log_dir):
return "" # 如果文件不存在,返回空字符串
print(log_dir)
with open(log_dir, 'r', encoding='utf-8') as f:
data = json.load(f)

View file

@ -53,7 +53,7 @@ import examples.aflow.scripts.optimized.{dataset}.workflows.round_{round}.prompt
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQa", "DROP"]
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
{graph}
"""

View file

@ -9,7 +9,7 @@ from examples.aflow.scripts.operator import Generate
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQa", "DROP"]
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# @Date : 8/23/2024 20:00 PM
# @Author : didi
# @Desc : Experiment of graph optimization
# @Desc : Entrance of AFlow.
from examples.aflow.scripts.optimizer import Optimizer
from metagpt.configs.models_config import ModelsConfig
@ -13,40 +13,44 @@ QuestionType = Literal["math", "code", "quiz"]
OptimizerType = Literal["Graph", "Test"]
# Crucial Parameters
dataset: DatasetType = "GSM8K" # Ensure the type is consistent with DatasetType
dataset: DatasetType = "HotpotQA" # Ensure the type is consistent with DatasetType
sample: int = 4 # Sample Count, which means how many workflows will be resampled from generated workflows
question_type: QuestionType = "math" # Ensure the type is consistent with QuestionType
question_type: QuestionType = "quiz" # Ensure the type is consistent with QuestionType
optimized_path: str = "examples/aflow/scripts/optimized" # Optimized Result Save Path
initial_round: int = 1 # Corrected the case from Initial_round to initial_round
max_rounds: int = 20
check_convergence: bool = True
# Initialize LLM Model
four_o_llm_config = ModelsConfig.default().get("gpt-4o")
deepseek_llm_config = ModelsConfig.default().get("deepseek-chat")
# Config llm model, you can modify `config/config2.yaml` to use more llms.
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
# Initialize Operators List
# Config operators.
operators = [
"Custom"
"Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes.
# "CustomCodeGenerate", # It's for code
# "Test", # It's for code
"ScEnsemble", # It's for code, math and QA
# "Programmer", # It's for math
"AnswerGenerate" # It's for QA
]
# Create an optimizer instance
optimizer = Optimizer(
dataset=dataset,
question_type=question_type,
opt_llm_config=claude_llm_config,
exec_llm_config=mini_llm_config,
check_convergence=check_convergence,
operators=operators,
optimized_path=optimized_path,
sample=sample,
initial_round=initial_round,
max_rounds=max_rounds
dataset=dataset, # Config dataset
question_type=question_type, # Config Question Type
opt_llm_config=claude_llm_config, # Config Optimizer LLM
exec_llm_config=mini_llm_config, # Config Execution LLM
check_convergence=check_convergence, # Whether Early Stop
operators=operators, # Config Operators you want to use
optimized_path=optimized_path, # Config Optimized workflow's file path
sample=sample, # Only Top(sample) rounds will be selected.
initial_round=initial_round, # Optimize from initial round
max_rounds=max_rounds # The max iteration of AFLOW.
)
if __name__ == "__main__":
# Run the optimizer
# Optimize workflow via setting the optimizer's mode to 'Graph'
optimizer.optimize("Graph")
# Test workflow via setting the optimizer's mode to 'Test'
# optimizer.optimize("Test")