mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-11 15:15:18 +02:00
mv aflow from example to ext
This commit is contained in:
parent
0b69ffe198
commit
fcc5e19160
29 changed files with 173 additions and 30 deletions
|
|
@ -1,101 +0,0 @@
|
|||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from typing import List, Tuple, Callable, Any
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
|
||||
import aiofiles
|
||||
import pandas as pd
|
||||
from tqdm.asyncio import tqdm_asyncio
|
||||
|
||||
class BaseBenchmark(ABC):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
self.name = name
|
||||
self.file_path = file_path
|
||||
self.log_path = log_path
|
||||
|
||||
async def load_data(self, specific_indices: List[int] = None) -> List[dict]:
|
||||
data = []
|
||||
async with aiofiles.open(self.file_path, mode="r", encoding='utf-8') as file:
|
||||
async for line in file:
|
||||
data.append(json.loads(line))
|
||||
|
||||
if specific_indices is not None:
|
||||
filtered_data = [data[i] for i in specific_indices if i < len(data)]
|
||||
return filtered_data
|
||||
|
||||
return data
|
||||
|
||||
def save_results_to_csv(self, results: List[Tuple[Any, ...]], columns: List[str]):
|
||||
df = pd.DataFrame(results, columns=columns)
|
||||
avg_score = df["score"].mean()
|
||||
t_cost = df["cost"].max()
|
||||
a_cost = t_cost / len(df) if len(df) > 0 else 0
|
||||
|
||||
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"{avg_score:.5f}_{current_time}.csv"
|
||||
output_file = os.path.join(self.log_path, filename)
|
||||
|
||||
df.to_csv(output_file, index=False)
|
||||
print(f"Results saved to {output_file}")
|
||||
|
||||
return avg_score, a_cost, t_cost
|
||||
|
||||
def log_mismatch(self, problem: str, expected_output: Any, prediction: str, extracted_output: Any):
|
||||
log_data = {
|
||||
"question": problem,
|
||||
"right_answer": expected_output,
|
||||
"model_output": prediction,
|
||||
"extracted_output": extracted_output
|
||||
}
|
||||
|
||||
log_file = os.path.join(self.log_path, 'log.json')
|
||||
|
||||
if os.path.exists(log_file):
|
||||
with open(log_file, 'r', encoding='utf-8') as f:
|
||||
try:
|
||||
data = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
data = []
|
||||
else:
|
||||
data = []
|
||||
|
||||
data.append(log_data)
|
||||
|
||||
with open(log_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
@abstractmethod
|
||||
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[Any, ...]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def calculate_score(self, expected_output: Any, prediction: Any) -> Tuple[float, Any]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_result_columns(self) -> List[str]:
|
||||
pass
|
||||
|
||||
async def evaluate_all_problems(self, data: List[dict], graph: Callable, max_concurrent_tasks: int = 50):
|
||||
semaphore = asyncio.Semaphore(max_concurrent_tasks)
|
||||
|
||||
async def sem_evaluate(problem):
|
||||
async with semaphore:
|
||||
return await self.evaluate_problem(problem, graph)
|
||||
|
||||
tasks = [sem_evaluate(problem) for problem in data]
|
||||
|
||||
return await tqdm_asyncio.gather(*tasks, desc=f"Evaluating {self.name} problems", total=len(data))
|
||||
|
||||
async def run_evaluation(self, graph: Callable, va_list: List[int], max_concurrent_tasks: int = 50):
|
||||
data = await self.load_data(va_list)
|
||||
results = await self.evaluate_all_problems(data, graph, max_concurrent_tasks)
|
||||
columns = self.get_result_columns()
|
||||
average_score, average_cost, total_cost = self.save_results_to_csv(results, columns)
|
||||
print(f"Average score on {self.name} dataset: {average_score:.5f}")
|
||||
print(f"Total Cost: {total_cost:.5f}")
|
||||
return average_score, average_cost, total_cost
|
||||
|
||||
|
||||
|
|
@ -1,93 +0,0 @@
|
|||
import os
|
||||
import re
|
||||
import json
|
||||
import asyncio
|
||||
import string
|
||||
from collections import Counter
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, List, Tuple
|
||||
|
||||
import aiofiles
|
||||
import pandas as pd
|
||||
from tqdm.asyncio import tqdm_asyncio
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
|
||||
class DROPBenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
def normalize_answer(self, s: str) -> List[str]:
|
||||
"""
|
||||
Normalize answers for evaluation.
|
||||
"""
|
||||
|
||||
def remove_articles(text):
|
||||
return re.sub(r"\b(a|an|the)\b", " ", text)
|
||||
|
||||
def white_space_fix(text):
|
||||
return " ".join(text.split())
|
||||
|
||||
def remove_punc(text):
|
||||
exclude = set(string.punctuation)
|
||||
return "".join(ch for ch in text if ch not in exclude)
|
||||
|
||||
def lower(text):
|
||||
return text.lower()
|
||||
|
||||
return white_space_fix(remove_articles(remove_punc(lower(s))))
|
||||
|
||||
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
|
||||
"""
|
||||
Compute the F1 score between prediction and ground truth answers.
|
||||
"""
|
||||
prediction_tokens = self.normalize_answer(prediction).split()
|
||||
ground_truth_tokens = self.normalize_answer(ground_truth).split()
|
||||
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
|
||||
num_same = sum(common.values())
|
||||
if num_same == 0:
|
||||
return 0, prediction
|
||||
precision = 1.0 * num_same / len(prediction_tokens)
|
||||
recall = 1.0 * num_same / len(ground_truth_tokens)
|
||||
f1 = (2 * precision * recall) / (precision + recall)
|
||||
return f1, prediction
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, input_text):
|
||||
return await graph(input_text)
|
||||
|
||||
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
|
||||
input_text = problem["context"]
|
||||
expected_output = problem["ref_text"]
|
||||
answers = expected_output.split("|")
|
||||
|
||||
try:
|
||||
output, cost = await self._generate_output(graph, input_text)
|
||||
f1_scores = []
|
||||
|
||||
for answer in answers:
|
||||
if answer.strip() != "":
|
||||
output_parts = output.split("|")
|
||||
for output_part in output_parts:
|
||||
f1_score, _ = self.calculate_score(answer, output_part)
|
||||
f1_scores.append(f1_score)
|
||||
|
||||
uni_score = max(f1_scores)
|
||||
|
||||
if uni_score < 0.3:
|
||||
self.log_mismatch(input_text, expected_output, output, output)
|
||||
|
||||
return input_text, output, expected_output, uni_score, cost
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["inputs", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date :
|
||||
# @Author : all
|
||||
# @Desc : test on gsm8k
|
||||
import re
|
||||
import json
|
||||
import asyncio
|
||||
import aiofiles
|
||||
import pandas as pd
|
||||
from typing import Optional, List, Tuple, Callable, Any
|
||||
|
||||
from pandas import Series
|
||||
from tqdm.asyncio import tqdm_asyncio
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
|
||||
class GSM8KBenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
def extract_number(self, text: str) -> Optional[float]:
|
||||
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", str(text))
|
||||
if matches:
|
||||
last_number = matches[-1].replace(",", "")
|
||||
try:
|
||||
return float(last_number)
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def calculate_score(self, expected_output: float, prediction: float) -> Tuple[float, float]:
|
||||
if prediction is None:
|
||||
return 0.0, prediction
|
||||
return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, input_text):
|
||||
return await graph(input_text)
|
||||
|
||||
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]:
|
||||
input_text = problem["question"]
|
||||
expected_output = self.extract_number(problem["answer"])
|
||||
|
||||
try:
|
||||
output, cost = await self._generate_output(graph, input_text)
|
||||
predicted_number = self.extract_number(output)
|
||||
score, extracted_output = self.calculate_score(expected_output, predicted_number)
|
||||
|
||||
if score == 0:
|
||||
self.log_mismatch(input_text, expected_output, output, extracted_output)
|
||||
|
||||
return input_text, output, expected_output, score, cost
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["question", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,76 +0,0 @@
|
|||
import json
|
||||
import asyncio
|
||||
import aiofiles
|
||||
import pandas as pd
|
||||
from typing import List, Tuple, Callable, Any
|
||||
import string
|
||||
import re
|
||||
import os
|
||||
from collections import Counter
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
|
||||
class HotpotQABenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
def normalize_answer(self, s: str) -> str:
|
||||
def remove_articles(text):
|
||||
return re.sub(r"\b(a|an|the)\b", " ", text)
|
||||
|
||||
def white_space_fix(text):
|
||||
return " ".join(text.split())
|
||||
|
||||
def remove_punc(text):
|
||||
exclude = set(string.punctuation)
|
||||
return "".join(ch for ch in text if ch not in exclude)
|
||||
|
||||
def lower(text):
|
||||
return text.lower()
|
||||
|
||||
return white_space_fix(remove_articles(remove_punc(lower(s))))
|
||||
|
||||
def calculate_score(self, ground_truth: str, prediction: str) -> Tuple[float, str]:
|
||||
prediction_tokens = self.normalize_answer(prediction).split()
|
||||
ground_truth_tokens = self.normalize_answer(ground_truth).split()
|
||||
common = Counter(prediction_tokens) & Counter(ground_truth_tokens)
|
||||
num_same = sum(common.values())
|
||||
if num_same == 0:
|
||||
return 0, prediction
|
||||
precision = 1.0 * num_same / len(prediction_tokens)
|
||||
recall = 1.0 * num_same / len(ground_truth_tokens)
|
||||
f1 = (2 * precision * recall) / (precision + recall)
|
||||
return f1, prediction
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, input_text):
|
||||
return await graph(input_text)
|
||||
|
||||
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]:
|
||||
input_text = problem["question"]
|
||||
expected_output = problem["answer"]
|
||||
paragraphs = [item[1] for item in problem["context"] if isinstance(item[1], list)]
|
||||
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
|
||||
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
|
||||
|
||||
try:
|
||||
output, cost = await self._generate_output(graph, inputs)
|
||||
score, extracted_output = self.calculate_score(expected_output, output)
|
||||
|
||||
if score < 0.3: # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1
|
||||
self.log_mismatch(input_text, expected_output, output, extracted_output)
|
||||
|
||||
return input_text, context_str, output, expected_output, score, cost
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, context_str, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["question", "context", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,143 +0,0 @@
|
|||
import os
|
||||
import time
|
||||
import json
|
||||
import asyncio
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from typing import List, Tuple, Callable, Dict, Any, Optional
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
from metagpt.actions.code_sanitize import sanitize
|
||||
|
||||
class HumanEvalBenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
PASS = "PASS"
|
||||
FAIL = "FAIL"
|
||||
|
||||
class TimeoutError(Exception):
|
||||
pass
|
||||
|
||||
def run_with_timeout(self, func, args, timeout):
|
||||
result = []
|
||||
stop_event = threading.Event()
|
||||
|
||||
def target():
|
||||
try:
|
||||
result.append(func(*args))
|
||||
except Exception as e:
|
||||
result.append(e)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.start()
|
||||
is_timeout = not stop_event.wait(timeout)
|
||||
|
||||
if is_timeout:
|
||||
raise self.TimeoutError("Function execution timed out")
|
||||
|
||||
if not result:
|
||||
return None
|
||||
if isinstance(result[0], Exception):
|
||||
raise result[0]
|
||||
return result[0]
|
||||
|
||||
def check_solution(self, solution, test, entry_point):
|
||||
solution = sanitize(code=solution, entrypoint=entry_point)
|
||||
try:
|
||||
global_dict = {
|
||||
'math': __import__('math'),
|
||||
'hashlib': __import__('hashlib'),
|
||||
're': __import__('re'),
|
||||
'List': List,
|
||||
'Dict': Dict,
|
||||
'Tuple': Tuple,
|
||||
'Optional': Optional,
|
||||
'Any': Any
|
||||
}
|
||||
|
||||
# Add handling for special cases
|
||||
if entry_point == "decode_cyclic":
|
||||
solution = "\n\ndef encode_cyclic(s: str):\n \"\"\"\n returns encoded string by cycling groups of three characters.\n \"\"\"\n # split string to groups. Each of length 3.\n groups = [s[(3 * i):min((3 * i + 3), len(s))] for i in range((len(s) + 2) // 3)]\n # cycle elements in each group. Unless group has fewer elements than 3.\n groups = [(group[1:] + group[0]) if len(group) == 3 else group for group in groups]\n return \"\".join(groups)" + "\n\n" + solution
|
||||
elif entry_point == "decode_shift":
|
||||
solution = "\n\ndef encode_shift(s: str):\n \"\"\"\n returns encoded string by shifting every character by 5 in the alphabet.\n \"\"\"\n return \"\".join([chr(((ord(ch) + 5 - ord(\"a\")) % 26) + ord(\"a\")) for ch in s])\n\n\n" + solution
|
||||
elif entry_point == "find_zero":
|
||||
solution = "\n\ndef poly(xs: list, x: float):\n return sum(coeff * (x ** i) for i, coeff in enumerate(xs))\n\n" + solution
|
||||
|
||||
exec(solution, global_dict)
|
||||
|
||||
if entry_point not in global_dict:
|
||||
raise ValueError(f"Function {entry_point} is not defined in the solution.")
|
||||
|
||||
exec(test, global_dict)
|
||||
|
||||
check = global_dict["check"]
|
||||
|
||||
result = self.run_with_timeout(check, (global_dict[entry_point],), 15)
|
||||
|
||||
if result is None:
|
||||
result = (self.PASS, "The solution passed all test cases.")
|
||||
|
||||
except self.TimeoutError:
|
||||
result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.")
|
||||
except Exception as e:
|
||||
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
|
||||
result = (self.FAIL, error_message)
|
||||
|
||||
with open('error.log', 'a', encoding='utf-8') as log_file:
|
||||
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
|
||||
|
||||
return result
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, prompt, entry_point):
|
||||
# Generate output with a timeout of 60 seconds
|
||||
return await asyncio.wait_for(graph(prompt, entry_point), timeout=60)
|
||||
|
||||
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
|
||||
input_text = data["prompt"]
|
||||
expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
|
||||
|
||||
try:
|
||||
# Generate prediction using the graph function
|
||||
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
|
||||
|
||||
# Check the solution
|
||||
ret = self.check_solution(prediction, data["test"], data["entry_point"])
|
||||
test_case_details = ret[1]
|
||||
expected_output = test_case_details + expected_output
|
||||
|
||||
# Calculate score based on the check result
|
||||
score = 1.0 if ret[0] == self.PASS else 0.0
|
||||
|
||||
# Log mismatch if the score is 0
|
||||
if score == 0:
|
||||
self.log_mismatch(input_text, expected_output, prediction, score)
|
||||
|
||||
return input_text, prediction, expected_output, score, cost
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
print("Timeout error. Skipping this sample.")
|
||||
return input_text, "Timeout", expected_output, 0.0, 0.0
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
|
||||
# The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface
|
||||
return 0.0, prediction
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["inputs", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,130 +0,0 @@
|
|||
import re
|
||||
import regex
|
||||
from sympy import N, simplify
|
||||
from sympy.parsing.latex import parse_latex
|
||||
from sympy.parsing.sympy_parser import parse_expr
|
||||
from math import isclose
|
||||
import multiprocessing
|
||||
from typing import Any, Callable, Tuple, List
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
|
||||
class MATHBenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
def extract_model_answer(self, text: str) -> str:
|
||||
pattern = r"\\boxed{((?:[^{}]|{[^{}]*})*)}"
|
||||
boxed_matches = re.findall(pattern, text, re.DOTALL)
|
||||
if boxed_matches:
|
||||
return boxed_matches[-1].strip()
|
||||
|
||||
sentence_end_pattern = r'(?<!\d)[.!?]\s+'
|
||||
sentences = re.split(sentence_end_pattern, text)
|
||||
sentences = [s.strip() for s in sentences if s.strip()]
|
||||
return sentences[-1] if sentences else ""
|
||||
|
||||
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[int, str]:
|
||||
expected_answer = self.extract_model_answer(expected_output)
|
||||
predicted_answer = self.extract_model_answer(prediction)
|
||||
|
||||
if self.math_equal(predicted_answer, expected_answer):
|
||||
return 1, predicted_answer
|
||||
else:
|
||||
return 0, predicted_answer
|
||||
|
||||
def math_equal(self, prediction: Any, reference: Any) -> bool:
|
||||
if str(prediction) == str(reference):
|
||||
return True
|
||||
|
||||
try:
|
||||
if self.is_digit(prediction) and self.is_digit(reference):
|
||||
prediction = self.parse_digits(prediction)
|
||||
reference = self.parse_digits(reference)
|
||||
return isclose(prediction, reference, abs_tol=1e-3)
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
return self.symbolic_equal(prediction, reference)
|
||||
except:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def is_digit(self, num):
|
||||
return self.parse_digits(num) is not None
|
||||
|
||||
def parse_digits(self, num):
|
||||
num = regex.sub(",", "", str(num))
|
||||
try:
|
||||
return float(num)
|
||||
except:
|
||||
if num.endswith("%"):
|
||||
num = num[:-1]
|
||||
if num.endswith("\\"):
|
||||
num = num[:-1]
|
||||
try:
|
||||
return float(num) / 100
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
def symbolic_equal(self, a, b):
|
||||
def _parse(s):
|
||||
for f in [parse_latex, parse_expr]:
|
||||
try:
|
||||
return f(s)
|
||||
except:
|
||||
pass
|
||||
return s
|
||||
|
||||
a = _parse(a)
|
||||
b = _parse(b)
|
||||
|
||||
try:
|
||||
if simplify(a - b) == 0:
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
if isclose(N(a), N(b), abs_tol=1e-3):
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, input_text):
|
||||
return await graph(input_text)
|
||||
|
||||
|
||||
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, int, float]:
|
||||
input_text = problem["problem"]
|
||||
expected_output = problem["solution"]
|
||||
|
||||
try:
|
||||
output, cost = await self._generate_output(graph, input_text)
|
||||
uni_score, extracted_output = self.calculate_score(expected_output, output)
|
||||
|
||||
if uni_score == 0:
|
||||
self.log_mismatch(input_text, expected_output, output, extracted_output)
|
||||
|
||||
return input_text, output, expected_output, uni_score, cost
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["question", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
import os
|
||||
import json
|
||||
import time
|
||||
import asyncio
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from typing import List, Tuple, Callable, Any, Optional, Dict
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
|
||||
|
||||
from metagpt.actions.code_sanitize import sanitize
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
|
||||
class MBPPBenchmark(BaseBenchmark):
|
||||
def __init__(self, name: str, file_path: str, log_path: str):
|
||||
super().__init__(name, file_path, log_path)
|
||||
|
||||
PASS = "PASS"
|
||||
FAIL = "FAIL"
|
||||
|
||||
class TimeoutError(Exception):
|
||||
pass
|
||||
|
||||
def run_with_timeout(self, func, timeout):
|
||||
result = []
|
||||
stop_event = threading.Event()
|
||||
|
||||
def target():
|
||||
try:
|
||||
result.append(func())
|
||||
except Exception as e:
|
||||
result.append(e)
|
||||
finally:
|
||||
stop_event.set()
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.start()
|
||||
is_timeout = not stop_event.wait(timeout)
|
||||
|
||||
if is_timeout:
|
||||
raise self.TimeoutError("Function execution timed out")
|
||||
|
||||
if not result:
|
||||
return None
|
||||
if isinstance(result[0], Exception):
|
||||
raise result[0]
|
||||
return result[0]
|
||||
|
||||
def check_solution(self, solution, test, entry_point):
|
||||
solution = sanitize(code=solution, entrypoint=entry_point)
|
||||
try:
|
||||
global_dict = {
|
||||
'math': __import__('math'),
|
||||
'hashlib': __import__('hashlib'),
|
||||
're': __import__('re'),
|
||||
'List': List,
|
||||
'Dict': Dict,
|
||||
'Tuple': Tuple,
|
||||
'Optional': Optional,
|
||||
'Any': Any
|
||||
}
|
||||
|
||||
exec(solution, global_dict)
|
||||
|
||||
if entry_point not in global_dict:
|
||||
raise ValueError(f"Function {entry_point} is not defined in the solution.")
|
||||
|
||||
exec(test, global_dict)
|
||||
|
||||
check = global_dict["check"]
|
||||
|
||||
result = self.run_with_timeout(check, 15)
|
||||
|
||||
if result is None:
|
||||
result = (self.PASS, "The solution passed all test cases.")
|
||||
|
||||
except self.TimeoutError:
|
||||
result = (self.FAIL, "Execution timed out. Please check if your solution contains infinite loops or overly time-consuming operations.")
|
||||
except Exception as e:
|
||||
error_message = f"Error: {str(e)}.\n Solution: {solution}.\n Test: {test}"
|
||||
result = (self.FAIL, error_message)
|
||||
|
||||
with open('error.log', 'a', encoding='utf-8') as log_file:
|
||||
log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {error_message}\n")
|
||||
|
||||
return result
|
||||
|
||||
@retry(
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_fixed(1),
|
||||
retry=retry_if_exception_type(Exception),
|
||||
reraise=True
|
||||
)
|
||||
async def _generate_output(self, graph, prompt, entry_point):
|
||||
return await graph(prompt, entry_point)
|
||||
|
||||
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
|
||||
input_text = data["prompt"]
|
||||
expected_output = "\nCorrect Solution:\ndef " + data["code"]
|
||||
|
||||
try:
|
||||
# Generate prediction using the graph function
|
||||
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
|
||||
|
||||
# Check the solution
|
||||
ret = self.check_solution(prediction, data["test"], data["entry_point"])
|
||||
test_case_details = ret[1]
|
||||
expected_output = test_case_details + "\nCorrect Solution:" + data["code"]
|
||||
|
||||
# Calculate score based on the check result
|
||||
score = 1.0 if ret[0] == self.PASS else 0.0
|
||||
|
||||
# Log mismatch if the score is 0
|
||||
if score == 0:
|
||||
self.log_mismatch(input_text, expected_output, prediction, score)
|
||||
|
||||
return input_text, prediction, expected_output, score, cost
|
||||
|
||||
except Exception as e:
|
||||
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
|
||||
return input_text, str(e), expected_output, 0.0, 0.0
|
||||
|
||||
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
|
||||
# The scoring logic for MBPP is already implemented in evaluate_problem, this is just to conform to the interface
|
||||
return 0.0, prediction
|
||||
|
||||
def get_result_columns(self) -> List[str]:
|
||||
return ["inputs", "prediction", "expected_output", "score", "cost"]
|
||||
|
|
@ -1,64 +0,0 @@
|
|||
import os
|
||||
import json
|
||||
import numpy as np
|
||||
|
||||
def generate_random_indices(n, n_samples, test=False):
|
||||
"""
|
||||
生成随机索引
|
||||
"""
|
||||
|
||||
def _set_seed(seed=42):
|
||||
np.random.seed(seed)
|
||||
|
||||
_set_seed()
|
||||
indices = np.arange(n)
|
||||
np.random.shuffle(indices)
|
||||
if test:
|
||||
return indices[n_samples:]
|
||||
else:
|
||||
return indices[:n_samples]
|
||||
|
||||
def split_data_set(file_path, samples, test=False):
|
||||
data = []
|
||||
|
||||
with open(file_path, 'r') as file:
|
||||
for line in file:
|
||||
data.append(json.loads(line))
|
||||
random_indices = generate_random_indices(len(data), samples, test)
|
||||
data = [data[i] for i in random_indices]
|
||||
return data
|
||||
|
||||
# save data into a jsonl file
|
||||
def save_data(data, file_path):
|
||||
with open(file_path, 'w') as file:
|
||||
for d in data:
|
||||
file.write(json.dumps(d) + '\n')
|
||||
|
||||
def log_mismatch(problem, expected_output, prediction, predicted_number, path):
|
||||
log_data = {
|
||||
"question": problem,
|
||||
"right_answer": expected_output,
|
||||
"model_output": prediction,
|
||||
"extracted_output": predicted_number
|
||||
}
|
||||
|
||||
log_file = os.path.join(path, 'log.json')
|
||||
|
||||
# 检查log文件是否已经存在
|
||||
if os.path.exists(log_file):
|
||||
# 如果存在,加载现有的日志数据
|
||||
with open(log_file, 'r', encoding='utf-8') as f:
|
||||
try:
|
||||
data = json.load(f)
|
||||
except json.JSONDecodeError:
|
||||
data = []
|
||||
else:
|
||||
# 如果不存在,创建一个新的日志列表
|
||||
data = []
|
||||
|
||||
# 添加新的日志记录
|
||||
data.append(log_data)
|
||||
|
||||
# 将数据写回到log.json文件
|
||||
with open(log_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=4, ensure_ascii=False)
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 2024-10-20
|
||||
# @Author : MoshiQAQ & didi
|
||||
# @Desc : Download and extract dataset files
|
||||
|
||||
import os
|
||||
import requests
|
||||
import tarfile
|
||||
from tqdm import tqdm
|
||||
from typing import List, Dict
|
||||
|
||||
def download_file(url: str, filename: str) -> None:
|
||||
"""Download a file from the given URL and show progress."""
|
||||
response = requests.get(url, stream=True)
|
||||
total_size = int(response.headers.get('content-length', 0))
|
||||
block_size = 1024
|
||||
progress_bar = tqdm(total=total_size, unit='iB', unit_scale=True)
|
||||
|
||||
with open(filename, 'wb') as file:
|
||||
for data in response.iter_content(block_size):
|
||||
size = file.write(data)
|
||||
progress_bar.update(size)
|
||||
progress_bar.close()
|
||||
|
||||
def extract_tar_gz(filename: str, extract_path: str) -> None:
|
||||
"""Extract a tar.gz file to the specified path."""
|
||||
with tarfile.open(filename, 'r:gz') as tar:
|
||||
tar.extractall(path=extract_path)
|
||||
|
||||
def process_dataset(url: str, filename: str, extract_path: str) -> None:
|
||||
"""Download, extract, and clean up a dataset."""
|
||||
print(f"Downloading {filename}...")
|
||||
download_file(url, filename)
|
||||
|
||||
print(f"Extracting {filename}...")
|
||||
extract_tar_gz(filename, extract_path)
|
||||
|
||||
print(f"{filename} download and extraction completed.")
|
||||
|
||||
os.remove(filename)
|
||||
print(f"Removed {filename}")
|
||||
|
||||
# Define the datasets to be downloaded
|
||||
# Users can modify this list to choose which datasets to download
|
||||
datasets_to_download: List[Dict[str, str]] = [
|
||||
{
|
||||
"name": "datasets",
|
||||
"url": "https://drive.google.com/uc?export=download&id=1tXp5cLw89egeKRwDuood2TPqoEWd8_C0",
|
||||
"filename": "aflow_data.tar.gz",
|
||||
"extract_path": "examples/aflow/data"
|
||||
},
|
||||
{
|
||||
"name": "results",
|
||||
"url": "", # Please fill in the correct URL
|
||||
"filename": "result.tar.gz",
|
||||
"extract_path": "examples/aflow/data/results"
|
||||
},
|
||||
{
|
||||
"name": "initial_rounds",
|
||||
"url": "", # Please fill in the correct URL
|
||||
"filename": "first_round.tar.gz",
|
||||
"extract_path": "examples/aflow/scripts/optimized"
|
||||
}
|
||||
]
|
||||
|
||||
def download(datasets):
|
||||
"""Main function to process all selected datasets."""
|
||||
for dataset_name in datasets:
|
||||
dataset = datasets_to_download[dataset_name]
|
||||
process_dataset(dataset['url'], dataset['filename'], dataset['extract_path'])
|
||||
61
examples/aflow/optimize.py
Normal file
61
examples/aflow/optimize.py
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 8/23/2024 20:00 PM
|
||||
# @Author : didi
|
||||
# @Desc : Entrance of AFlow.
|
||||
|
||||
from metagpt.ext.aflow.scripts.optimizer import Optimizer
|
||||
from metagpt.ext.aflow.scripts.evaluator import DatasetType, QuestionType, OptimizerType
|
||||
from metagpt.ext.aflow.data.download_data import download
|
||||
from metagpt.configs.models_config import ModelsConfig
|
||||
from typing import Literal
|
||||
|
||||
# DatasetType, QuestionType, and OptimizerType definitions
|
||||
# DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
|
||||
# QuestionType = Literal["math", "code", "qa"]
|
||||
# OptimizerType = Literal["Graph", "Test"]
|
||||
|
||||
# When you fisrt use, please download the datasets and initial rounds; If you want to get a look of the results, please download the results.
|
||||
# download(["datasets", "results", "initial_rounds"])
|
||||
|
||||
# Crucial Parameters
|
||||
dataset: DatasetType = "GSM8K" # Ensure the type is consistent with DatasetType
|
||||
sample: int = 4 # Sample Count, which means how many workflows will be resampled from generated workflows
|
||||
question_type: QuestionType = "code" # Ensure the type is consistent with QuestionType
|
||||
optimized_path: str = "examples/aflow/scripts/optimized" # Optimized Result Save Path
|
||||
initial_round: int = 1 # Corrected the case from Initial_round to initial_round
|
||||
max_rounds: int = 20
|
||||
check_convergence: bool = True
|
||||
|
||||
# Config llm model, you can modify `config/config2.yaml` to use more llms.
|
||||
mini_llm_config = ModelsConfig.default().get("gpt-4o-mini")
|
||||
claude_llm_config = ModelsConfig.default().get("claude-3-5-sonnet-20240620")
|
||||
|
||||
# Config operators.
|
||||
operators = [
|
||||
"Custom", # It's basic unit of a fixed node. optimizer can modify its prompt to get vairous nodes.
|
||||
# "AnswerGenerate" # It's for qa
|
||||
# "CustomCodeGenerate", # It's for code
|
||||
"ScEnsemble", # It's for code, math and qa
|
||||
# "Test", # It's for code
|
||||
"Programmer", # It's for math
|
||||
]
|
||||
|
||||
# Create an optimizer instance
|
||||
optimizer = Optimizer(
|
||||
dataset=dataset, # Config dataset
|
||||
question_type=question_type, # Config Question Type
|
||||
opt_llm_config=claude_llm_config, # Config Optimizer LLM
|
||||
exec_llm_config=mini_llm_config, # Config Execution LLM
|
||||
check_convergence=check_convergence, # Whether Early Stop
|
||||
operators=operators, # Config Operators you want to use
|
||||
optimized_path=optimized_path, # Config Optimized workflow's file path
|
||||
sample=sample, # Only Top(sample) rounds will be selected.
|
||||
initial_round=initial_round, # Optimize from initial round
|
||||
max_rounds=max_rounds # The max iteration of AFLOW.
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Optimize workflow via setting the optimizer's mode to 'Graph'
|
||||
optimizer.optimize("Graph")
|
||||
# Test workflow via setting the optimizer's mode to 'Test'
|
||||
# optimizer.optimize("Test")
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 8/23/2024 10:00 AM
|
||||
# @Author : all
|
||||
# @Desc : Evaluation for different datasets
|
||||
|
||||
from typing import Literal, Tuple, Optional, Dict
|
||||
import asyncio
|
||||
|
||||
from examples.aflow.benchmark.benchmark import BaseBenchmark
|
||||
from examples.aflow.benchmark.gsm8k import GSM8KBenchmark
|
||||
from examples.aflow.benchmark.math import MATHBenchmark
|
||||
from examples.aflow.benchmark.humaneval import HumanEvalBenchmark
|
||||
from examples.aflow.benchmark.hotpotqa import HotpotQABenchmark
|
||||
from examples.aflow.benchmark.mbpp import MBPPBenchmark
|
||||
from examples.aflow.benchmark.drop import DROPBenchmark
|
||||
|
||||
# If you want to customize tasks, add task types here and provide evaluation functions, just like the ones given above
|
||||
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
|
||||
|
||||
class Evaluator:
|
||||
"""
|
||||
Complete the evaluation for different datasets here
|
||||
"""
|
||||
|
||||
def __init__(self, eval_path: str):
|
||||
self.eval_path = eval_path
|
||||
self.dataset_configs: Dict[DatasetType, BaseBenchmark] = {
|
||||
"GSM8K": GSM8KBenchmark,
|
||||
"MATH": MATHBenchmark,
|
||||
"HumanEval": HumanEvalBenchmark,
|
||||
"HotpotQA": HotpotQABenchmark,
|
||||
"MBPP": MBPPBenchmark,
|
||||
"DROP": DROPBenchmark,
|
||||
}
|
||||
|
||||
async def graph_evaluate(self, dataset: DatasetType, graph, params: dict, path: str, is_test: bool = False) -> Tuple[float, float, float]:
|
||||
if dataset not in self.dataset_configs:
|
||||
raise ValueError(f"Unsupported dataset: {dataset}")
|
||||
|
||||
data_path = self._get_data_path(dataset, is_test)
|
||||
benchmark_class = self.dataset_configs[dataset]
|
||||
benchmark = benchmark_class(name=dataset, file_path=data_path, log_path=path)
|
||||
|
||||
# Use params to configure the graph and benchmark
|
||||
configured_graph = await self._configure_graph(dataset, graph, params)
|
||||
|
||||
va_list = [1,2,3] # Use va_list from params, or use default value if not provided
|
||||
return await benchmark.run_evaluation(configured_graph, va_list)
|
||||
|
||||
async def _configure_graph(self, dataset, graph, params: dict):
|
||||
# Here you can configure the graph based on params
|
||||
# For example: set LLM configuration, dataset configuration, etc.
|
||||
dataset_config = params.get("dataset", {})
|
||||
llm_config = params.get("llm_config", {})
|
||||
return graph(name=dataset, llm_config=llm_config, dataset=dataset_config)
|
||||
|
||||
def _get_data_path(self, dataset: DatasetType, test: bool) -> str:
|
||||
base_path = f"examples/aflow/data/{dataset.lower()}"
|
||||
return f"{base_path}_test.jsonl" if test else f"{base_path}_validate.jsonl"
|
||||
|
|
@ -1,351 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 6/27/2024 17:36 PM
|
||||
# @Author : didi
|
||||
# @Desc : operator demo of aflow
|
||||
import random
|
||||
import sys
|
||||
import asyncio
|
||||
import traceback
|
||||
from collections import Counter
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import concurrent.futures
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed
|
||||
from examples.aflow.scripts.utils import extract_test_cases_from_jsonl
|
||||
|
||||
from examples.aflow.scripts.operator_an import (
|
||||
FormatOp,
|
||||
GenerateOp,
|
||||
CodeGenerateOp,
|
||||
AnswerGenerateOp,
|
||||
ScEnsembleOp,
|
||||
ReflectionTestOp,
|
||||
MdEnsembleOp,
|
||||
ReviewOp,
|
||||
ReviseOp,
|
||||
|
||||
)
|
||||
from examples.aflow.scripts.prompts.prompt import (
|
||||
FORMAT_PROMPT,
|
||||
ANSWER_GENERATION_PROMPT,
|
||||
SC_ENSEMBLE_PROMPT,
|
||||
PYTHON_CODE_VERIFIER_PROMPT,
|
||||
REFLECTION_ON_PUBLIC_TEST_PROMPT,
|
||||
MD_ENSEMBLE_PROMPT,
|
||||
REVIEW_PROMPT,
|
||||
REVISE_PROMPT,
|
||||
)
|
||||
from examples.aflow.scripts.utils import test_case_2_test_function
|
||||
from metagpt.actions.action_node import ActionNode
|
||||
from metagpt.llm import LLM
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
class Operator:
|
||||
def __init__(self, llm: LLM, name: str):
|
||||
self.name = name
|
||||
self.llm = llm
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
|
||||
fill_kwargs = {"context": prompt, "llm": self.llm}
|
||||
if mode:
|
||||
fill_kwargs["mode"] = mode
|
||||
fill_kwargs.update(extra_kwargs)
|
||||
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
|
||||
return node.instruct_content.model_dump()
|
||||
|
||||
|
||||
class Custom(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Custom"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, input, instruction):
|
||||
prompt = instruction + input
|
||||
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
|
||||
return response
|
||||
|
||||
class AnswerGenerate(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
|
||||
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
|
||||
response = await self._fill_node(AnswerGenerateOp, prompt, mode="xml_fill")
|
||||
return response
|
||||
|
||||
class CustomCodeGenerate(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, problem, entry_point, instruction):
|
||||
prompt = instruction + problem
|
||||
response = await self._fill_node(GenerateOp, prompt, mode="code_fill", function_name=entry_point)
|
||||
return response
|
||||
|
||||
|
||||
class ScEnsemble(Operator):
|
||||
"""
|
||||
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
|
||||
Link: https://arxiv.org/abs/2203.11171
|
||||
Paper: Universal Self-Consistency for Large Language Model Generation
|
||||
Link: https://arxiv.org/abs/2311.17311
|
||||
"""
|
||||
|
||||
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, solutions: List[str]):
|
||||
answer_mapping = {}
|
||||
solution_text = ""
|
||||
for index, solution in enumerate(solutions):
|
||||
answer_mapping[chr(65 + index)] = index
|
||||
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
|
||||
|
||||
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
|
||||
response = await self._fill_node(ScEnsembleOp, prompt, mode="xml_fill")
|
||||
|
||||
answer = response.get("solution_letter", "")
|
||||
answer = answer.strip().upper()
|
||||
|
||||
return {"response": solutions[answer_mapping[answer]]}
|
||||
|
||||
def run_code(code):
|
||||
try:
|
||||
# Create a new global namespace
|
||||
global_namespace = {}
|
||||
|
||||
disallowed_imports = [
|
||||
"os", "sys", "subprocess", "multiprocessing",
|
||||
"matplotlib", "seaborn", "plotly", "bokeh", "ggplot",
|
||||
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
|
||||
]
|
||||
|
||||
# Check for prohibited imports
|
||||
for lib in disallowed_imports:
|
||||
if f"import {lib}" in code or f"from {lib}" in code:
|
||||
logger.info("Detected prohibited import: %s", lib)
|
||||
return "Error", f"Prohibited import: {lib} and graphing functionalities"
|
||||
|
||||
# Use exec to execute the code
|
||||
exec(code, global_namespace)
|
||||
# Assume the code defines a function named 'solve'
|
||||
if 'solve' in global_namespace and callable(global_namespace['solve']):
|
||||
result = global_namespace['solve']()
|
||||
return "Success", str(result)
|
||||
else:
|
||||
return "Error", "Function 'solve' not found"
|
||||
except Exception as e:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
|
||||
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"
|
||||
|
||||
|
||||
class Programmer(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Programmer"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def exec_code(self, code, timeout=30):
|
||||
"""
|
||||
Asynchronously execute code and return an error if timeout occurs.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
|
||||
try:
|
||||
# Submit run_code task to the process pool
|
||||
future = loop.run_in_executor(executor, run_code, code)
|
||||
# Wait for the task to complete or timeout
|
||||
result = await asyncio.wait_for(future, timeout=timeout)
|
||||
return result
|
||||
except asyncio.TimeoutError:
|
||||
# Timeout, attempt to shut down the process pool
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
return "Error", "Code execution timed out"
|
||||
except Exception as e:
|
||||
return "Error", f"Unknown error: {str(e)}"
|
||||
|
||||
async def code_generate(self, problem, analysis, feedback, mode):
|
||||
"""
|
||||
Asynchronous method to generate code.
|
||||
"""
|
||||
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
|
||||
problem=problem,
|
||||
analysis=analysis,
|
||||
feedback=feedback
|
||||
)
|
||||
response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve")
|
||||
return response
|
||||
|
||||
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
|
||||
async def __call__(self, problem: str, analysis: str = "None"):
|
||||
"""
|
||||
Call method, generate code and execute, retry up to 3 times.
|
||||
"""
|
||||
code = None
|
||||
output = None
|
||||
feedback = ""
|
||||
for i in range(3):
|
||||
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
|
||||
code = code_response.get("code")
|
||||
if not code:
|
||||
return {"code": code, "output": "No code generated"}
|
||||
status, output = await self.exec_code(code)
|
||||
if status == "Success":
|
||||
return {"code": code, "output": output}
|
||||
else:
|
||||
print(f"Execution error on attempt {i + 1}, error message: {output}")
|
||||
feedback = (
|
||||
f"\nThe result of the error from the code you wrote in the previous round:\n"
|
||||
f"Code: {code}\n\nStatus: {status}, {output}"
|
||||
)
|
||||
return {"code": code, "output": output}
|
||||
|
||||
|
||||
class Test(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Test"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
def exec_code(self, solution, entry_point):
|
||||
|
||||
test_cases = extract_test_cases_from_jsonl(entry_point)
|
||||
|
||||
fail_cases = []
|
||||
for test_case in test_cases:
|
||||
test_code = test_case_2_test_function(solution, test_case, entry_point)
|
||||
try:
|
||||
exec(test_code, globals())
|
||||
except AssertionError as e:
|
||||
exc_type, exc_value, exc_traceback = sys.exc_info()
|
||||
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
|
||||
with open("tester.txt", "a") as f:
|
||||
f.write("test_error of " + entry_point + "\n")
|
||||
error_infomation = {
|
||||
"test_fail_case": {
|
||||
"test_case": test_case,
|
||||
"error_type": "AssertionError",
|
||||
"error_message": str(e),
|
||||
"traceback": tb_str,
|
||||
}
|
||||
}
|
||||
fail_cases.append(error_infomation)
|
||||
except Exception as e:
|
||||
with open("tester.txt", "a") as f:
|
||||
f.write(entry_point + " " + str(e) + "\n")
|
||||
return {"exec_fail_case": str(e)}
|
||||
if fail_cases != []:
|
||||
return fail_cases
|
||||
else:
|
||||
return "no error"
|
||||
|
||||
async def __call__(
|
||||
self, problem, solution, entry_point, test_loop: int = 3
|
||||
):
|
||||
"""
|
||||
"Test": {
|
||||
"description": "Test the solution with test cases, if the solution is correct, return 'no error', if the solution is incorrect, return reflect on the soluion and the error information",
|
||||
"interface": "test(problem: str, solution: str, entry_point: str) -> str"
|
||||
}
|
||||
"""
|
||||
for _ in range(test_loop):
|
||||
result = self.exec_code(solution, entry_point)
|
||||
if result == "no error":
|
||||
return {"result": True, "solution": solution}
|
||||
elif "exec_fail_case" in result:
|
||||
result = result["exec_fail_case"]
|
||||
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
|
||||
problem=problem,
|
||||
solution=solution,
|
||||
exec_pass=f"executed unsuccessfully, error: \n {result}",
|
||||
test_fail="executed unsucessfully",
|
||||
)
|
||||
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
|
||||
solution = response["reflection_and_solution"]
|
||||
else:
|
||||
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
|
||||
problem=problem,
|
||||
solution=solution,
|
||||
exec_pass="executed successfully",
|
||||
test_fail=result,
|
||||
)
|
||||
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
|
||||
solution = response["reflection_and_solution"]
|
||||
|
||||
result = self.exec_code(solution, entry_point)
|
||||
if result == "no error":
|
||||
return {"result": True, "solution": solution}
|
||||
else:
|
||||
return {"result": False, "solution": solution}
|
||||
|
||||
|
||||
class Format(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Format"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, problem, solution, mode: str = None):
|
||||
prompt = FORMAT_PROMPT.format(problem_description=problem, solution=solution)
|
||||
response = await self._fill_node(FormatOp, prompt, mode)
|
||||
return response
|
||||
|
||||
|
||||
class Review(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Review"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, problem, solution, mode: str = None):
|
||||
prompt = REVIEW_PROMPT.format(problem=problem, solution=solution)
|
||||
response = await self._fill_node(ReviewOp, prompt, mode="xml_fill")
|
||||
return response
|
||||
|
||||
class Revise(Operator):
|
||||
def __init__(self, llm: LLM, name: str = "Revise"):
|
||||
super().__init__(llm, name)
|
||||
|
||||
async def __call__(self, problem, solution, feedback, mode: str = None):
|
||||
prompt = REVISE_PROMPT.format(problem=problem, solution=solution, feedback=feedback)
|
||||
response = await self._fill_node(ReviseOp, prompt, mode="xml_fill")
|
||||
return response
|
||||
|
||||
|
||||
class MdEnsemble(Operator):
|
||||
"""
|
||||
Paper: Can Generalist Foundation Models Outcompete Special-Purpose Tuning? Case Study in Medicine
|
||||
Link: https://arxiv.org/abs/2311.16452
|
||||
"""
|
||||
|
||||
def __init__(self, llm: LLM, name: str = "MdEnsemble", vote_count: int = 5):
|
||||
super().__init__(llm, name)
|
||||
self.vote_count = vote_count
|
||||
|
||||
@staticmethod
|
||||
def shuffle_answers(solutions: List[str]) -> Tuple[List[str], Dict[str, str]]:
|
||||
shuffled_solutions = solutions.copy()
|
||||
random.shuffle(shuffled_solutions)
|
||||
answer_mapping = {chr(65 + i): solutions.index(solution) for i, solution in enumerate(shuffled_solutions)}
|
||||
return shuffled_solutions, answer_mapping
|
||||
|
||||
async def __call__(self, solutions: List[str], problem: str, mode: str = None):
|
||||
print(f"solution count: {len(solutions)}")
|
||||
all_responses = []
|
||||
|
||||
for _ in range(self.vote_count):
|
||||
shuffled_solutions, answer_mapping = self.shuffle_answers(solutions)
|
||||
|
||||
solution_text = ""
|
||||
for index, solution in enumerate(shuffled_solutions):
|
||||
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
|
||||
|
||||
prompt = MD_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
|
||||
response = await self._fill_node(MdEnsembleOp, prompt, mode="xml_fill")
|
||||
|
||||
answer = response.get("solution_letter", "A")
|
||||
answer = answer.strip().upper()
|
||||
|
||||
if answer in answer_mapping:
|
||||
original_index = answer_mapping[answer]
|
||||
all_responses.append(original_index)
|
||||
|
||||
most_frequent_index = Counter(all_responses).most_common(1)[0][0]
|
||||
final_answer = solutions[most_frequent_index]
|
||||
return {"solution": final_answer}
|
||||
|
|
@ -1,39 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 6/27/2024 19:46 PM
|
||||
# @Author : didi
|
||||
# @Desc : action nodes for operator
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class GenerateOp(BaseModel):
|
||||
response: str = Field(default="", description="Your solution for this problem")
|
||||
|
||||
class CodeGenerateOp(BaseModel):
|
||||
code: str = Field(default="", description="Your complete code solution for this problem")
|
||||
|
||||
class AnswerGenerateOp(BaseModel):
|
||||
thought: str = Field(default="", description="The step by step thinking process")
|
||||
answer: str = Field(default="", description="The final answer to the question")
|
||||
|
||||
class FormatOp(BaseModel):
|
||||
solution: str = Field(default="", description="Your formatted answer for this problem")
|
||||
|
||||
class ScEnsembleOp(BaseModel):
|
||||
thought: str = Field(default="", description="The thought of the most consistent solution.")
|
||||
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
|
||||
|
||||
class ReflectionTestOp(BaseModel):
|
||||
reflection_and_solution: str = Field(default="", description="Corrective solution for code execution errors or test case failures")
|
||||
|
||||
class MdEnsembleOp(BaseModel):
|
||||
thought: str = Field(default="", description="Step-by-step analysis of the solutions to determine the best one.")
|
||||
solution_letter: str = Field(default="", description="The letter of the chosen best solution (only one letter).")
|
||||
|
||||
class ReviewOp(BaseModel):
|
||||
review_result: bool = Field(default=False, description="The Review Result (Bool). If you think this solution looks good for you, return 'true'; If not, return 'false'")
|
||||
feedback: str = Field(default="",description="Your FeedBack for this problem based on the criteria. If the review result is true, you can put it 'nothing here'.")
|
||||
|
||||
class ReviseOp(BaseModel):
|
||||
solution: str = Field(default="", description="Based on the feedback, revised solution for this problem")
|
||||
|
||||
|
|
@ -1,197 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 8/12/2024 22:00 PM
|
||||
# @Author : issac
|
||||
# @Desc : optimizer for graph
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import List, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from metagpt.actions.action_node import ActionNode
|
||||
from metagpt.provider.llm_provider_registry import create_llm_instance
|
||||
from metagpt.logs import logger
|
||||
from examples.aflow.scripts.optimizer_utils.graph_utils import GraphUtils
|
||||
from examples.aflow.scripts.optimizer_utils.data_utils import DataUtils
|
||||
from examples.aflow.scripts.optimizer_utils.experience_utils import ExperienceUtils
|
||||
from examples.aflow.scripts.optimizer_utils.evaluation_utils import EvaluationUtils
|
||||
from examples.aflow.scripts.optimizer_utils.convergence_utils import ConvergenceUtils
|
||||
|
||||
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
|
||||
QuestionType = Literal["math", "code", "qa"]
|
||||
OptimizerType = Literal["Graph", "Test"]
|
||||
|
||||
|
||||
class GraphOptimize(BaseModel):
|
||||
modification: str = Field(default="", description="modification")
|
||||
graph: str = Field(default="", description="graph")
|
||||
prompt: str = Field(default="", description="prompt")
|
||||
|
||||
|
||||
class Optimizer:
|
||||
def __init__(
|
||||
self,
|
||||
dataset: DatasetType,
|
||||
question_type: QuestionType,
|
||||
opt_llm_config,
|
||||
exec_llm_config,
|
||||
operators: List,
|
||||
sample: int,
|
||||
check_convergence: bool = False,
|
||||
optimized_path: str = None,
|
||||
initial_round: int = 1,
|
||||
max_rounds: int = 20
|
||||
) -> None:
|
||||
self.optimize_llm_config = opt_llm_config
|
||||
self.optimize_llm = create_llm_instance(self.optimize_llm_config)
|
||||
self.execute_llm_config = exec_llm_config
|
||||
|
||||
self.dataset = dataset
|
||||
self.type = question_type
|
||||
self.check_convergence = check_convergence
|
||||
|
||||
self.graph = None
|
||||
self.operators = operators
|
||||
|
||||
self.root_path = f"{optimized_path}/{self.dataset}"
|
||||
self.sample = sample
|
||||
self.top_scores = []
|
||||
self.round = initial_round
|
||||
self.max_rounds = max_rounds
|
||||
|
||||
self.graph_utils = GraphUtils(self.root_path)
|
||||
self.data_utils = DataUtils(self.root_path)
|
||||
self.experience_utils = ExperienceUtils(self.root_path)
|
||||
self.evaluation_utils = EvaluationUtils(self.root_path)
|
||||
self.convergence_utils = ConvergenceUtils(self.root_path)
|
||||
|
||||
def optimize(self, mode: OptimizerType = "Graph"):
|
||||
if mode == "Test":
|
||||
test_n = 3 # validation datasets's execution number
|
||||
for i in range(test_n):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
score = loop.run_until_complete(self.test())
|
||||
return None
|
||||
|
||||
for opt_round in range(self.max_rounds):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
retry_count = 0
|
||||
max_retries = 1
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
score = loop.run_until_complete(self._optimize_graph())
|
||||
break
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
logger.info(f"Error occurred: {e}. Retrying... (Attempt {retry_count}/{max_retries})")
|
||||
if retry_count == max_retries:
|
||||
logger.info("Max retries reached. Moving to next round.")
|
||||
score = None
|
||||
|
||||
wait_time = 5 * retry_count
|
||||
time.sleep(wait_time)
|
||||
|
||||
if retry_count < max_retries:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
self.round += 1
|
||||
logger.info(f"Score for round {self.round}: {score}")
|
||||
|
||||
converged, convergence_round, final_round = self.convergence_utils.check_convergence(top_k=3)
|
||||
|
||||
if converged and self.check_convergence:
|
||||
|
||||
logger.info(f"Convergence detected, occurred in round {convergence_round}, final round is {final_round}")
|
||||
# Print average scores and standard deviations for each round
|
||||
self.convergence_utils.print_results()
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
async def _optimize_graph(self):
|
||||
validation_n = 2 # validation datasets's execution number
|
||||
graph_path = f"{self.root_path}/workflows"
|
||||
data = self.data_utils.load_results(graph_path)
|
||||
|
||||
if self.round == 1:
|
||||
directory = self.graph_utils.create_round_directory(graph_path, self.round)
|
||||
# Load graph using graph_utils
|
||||
self.graph = self.graph_utils.load_graph(self.round, graph_path)
|
||||
avg_score = await self.evaluation_utils.evaluate_graph(self, directory, validation_n, data, initial=True)
|
||||
|
||||
# Create a loop until the generated graph meets the check conditions
|
||||
while True:
|
||||
directory = self.graph_utils.create_round_directory(graph_path, self.round + 1)
|
||||
|
||||
top_rounds = self.data_utils.get_top_rounds(self.sample)
|
||||
sample = self.data_utils.select_round(top_rounds)
|
||||
|
||||
prompt, graph_load = self.graph_utils.read_graph_files(sample["round"], graph_path)
|
||||
graph = self.graph_utils.extract_solve_graph(graph_load)
|
||||
|
||||
processed_experience = self.experience_utils.load_experience()
|
||||
experience = self.experience_utils.format_experience(processed_experience, sample["round"])
|
||||
|
||||
operator_description = self.graph_utils.load_operators_description(self.operators)
|
||||
log_data = self.data_utils.load_log(sample["round"])
|
||||
|
||||
graph_optimize_prompt = self.graph_utils.create_graph_optimize_prompt(
|
||||
experience, sample["score"], graph[0], prompt, operator_description, self.type, log_data
|
||||
)
|
||||
|
||||
graph_optimize_node = await ActionNode.from_pydantic(GraphOptimize).fill(
|
||||
context=graph_optimize_prompt, mode="xml_fill", llm=self.optimize_llm
|
||||
)
|
||||
|
||||
response = await self.graph_utils.get_graph_optimize_response(graph_optimize_node)
|
||||
|
||||
# Check if the modification meets the conditions
|
||||
check = self.experience_utils.check_modification(processed_experience, response["modification"],
|
||||
sample["round"])
|
||||
|
||||
# If `check` is True, break the loop; otherwise, regenerate the graph
|
||||
if check:
|
||||
break
|
||||
|
||||
# Save the graph and evaluate
|
||||
self.graph_utils.write_graph_files(directory, response, self.round + 1, self.dataset)
|
||||
|
||||
experience = self.experience_utils.create_experience_data(sample, response["modification"])
|
||||
|
||||
self.graph = self.graph_utils.load_graph(self.round + 1, graph_path)
|
||||
|
||||
logger.info(directory)
|
||||
|
||||
avg_score = await self.evaluation_utils.evaluate_graph(self, directory, validation_n, data, initial=False)
|
||||
|
||||
self.experience_utils.update_experience(directory, experience, avg_score)
|
||||
|
||||
return avg_score
|
||||
|
||||
async def test(self):
|
||||
rounds = [5] # You can choose the rounds you want to test here.
|
||||
data = []
|
||||
|
||||
graph_path = f"{self.root_path}/workflows_test"
|
||||
json_file_path = self.data_utils.get_results_file_path(graph_path)
|
||||
|
||||
data = self.data_utils.load_results(graph_path)
|
||||
|
||||
for round in rounds:
|
||||
directory = self.graph_utils.create_round_directory(graph_path, round)
|
||||
self.graph = self.graph_utils.load_graph(round, graph_path)
|
||||
|
||||
score, avg_cost, total_cost = await self.evaluation_utils.evaluate_graph_test(
|
||||
self, directory, is_test=True
|
||||
)
|
||||
|
||||
new_data = self.data_utils.create_result_data(round, score, avg_cost, total_cost)
|
||||
data.append(new_data)
|
||||
|
||||
self.data_utils.save_results(json_file_path, data)
|
||||
|
|
@ -1,124 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 9/23/2024 10:00 AM
|
||||
# @Author : Issac
|
||||
# @Desc :
|
||||
|
||||
import numpy as np
|
||||
import json
|
||||
import os
|
||||
from metagpt.logs import logger
|
||||
|
||||
class ConvergenceUtils:
|
||||
def __init__(self, root_path):
|
||||
self.root_path = root_path
|
||||
self.data = None
|
||||
self.rounds = None
|
||||
self.avg_scores, self.stds = None, None
|
||||
|
||||
def load_data(self, root_path):
|
||||
"""
|
||||
读取 JSON 文件,如果不存在则创建一个新文件,然后返回数据。
|
||||
"""
|
||||
rounds_dir = os.path.join(root_path, "workflows")
|
||||
result_file = os.path.join(rounds_dir, "results.json")
|
||||
|
||||
# 确保目录存在
|
||||
os.makedirs(rounds_dir, exist_ok=True)
|
||||
|
||||
# 如果文件不存在,创建一个包含空列表的新文件
|
||||
if not os.path.exists(result_file):
|
||||
with open(result_file, 'w') as file:
|
||||
json.dump([], file)
|
||||
|
||||
# 读取文件并返回数据
|
||||
with open(result_file, 'r') as file:
|
||||
return json.load(file)
|
||||
|
||||
def process_rounds(self):
|
||||
"""
|
||||
以 round 为单位组织数据,返回按轮次的分数字典。
|
||||
"""
|
||||
self.data = self.load_data(root_path=self.root_path)
|
||||
rounds = {}
|
||||
for entry in self.data:
|
||||
round_number = entry['round']
|
||||
score = entry['score']
|
||||
if round_number not in rounds:
|
||||
rounds[round_number] = []
|
||||
rounds[round_number].append(score)
|
||||
return rounds
|
||||
|
||||
def calculate_avg_and_std(self):
|
||||
"""
|
||||
计算每轮的平均分和标准差,返回两个列表:平均分和标准差。
|
||||
"""
|
||||
self.rounds = self.process_rounds()
|
||||
|
||||
sorted_rounds = sorted(self.rounds.items(), key=lambda x: x[0])
|
||||
avg_scores = []
|
||||
stds = []
|
||||
for round_number, scores in sorted_rounds:
|
||||
avg_scores.append(np.mean(scores))
|
||||
stds.append(np.std(scores))
|
||||
return avg_scores, stds
|
||||
|
||||
def check_convergence(self, top_k=3, z=0, consecutive_rounds=5):
|
||||
"""
|
||||
检查收敛的函数。z 为置信水平对应的 z 分数 。
|
||||
consecutive_rounds 为连续轮次内满足停止条件的次数。
|
||||
"""
|
||||
self.avg_scores, self.stds = self.calculate_avg_and_std()
|
||||
|
||||
if len(self.avg_scores) < top_k + 1:
|
||||
return False, None, None
|
||||
|
||||
convergence_count = 0
|
||||
previous_Y = None
|
||||
sigma_Y_previous = None
|
||||
|
||||
for i in range(len(self.avg_scores)):
|
||||
# 动态选择当前轮次及之前所有轮次的 top_k
|
||||
top_k_indices = np.argsort(self.avg_scores[:i + 1])[::-1][:top_k]
|
||||
top_k_scores = [self.avg_scores[j] for j in top_k_indices]
|
||||
top_k_stds = [self.stds[j] for j in top_k_indices]
|
||||
|
||||
Y_current = np.mean(top_k_scores)
|
||||
sigma_Y_current = np.sqrt(np.sum([s ** 2 for s in top_k_stds]) / (top_k ** 2))
|
||||
|
||||
if previous_Y is not None:
|
||||
Delta_Y = Y_current - previous_Y
|
||||
sigma_Delta_Y = np.sqrt(sigma_Y_current ** 2 + sigma_Y_previous ** 2)
|
||||
|
||||
if abs(Delta_Y) <= z * sigma_Delta_Y:
|
||||
convergence_count += 1
|
||||
if convergence_count >= consecutive_rounds:
|
||||
return True, i - consecutive_rounds + 1, i
|
||||
else:
|
||||
convergence_count = 0
|
||||
|
||||
previous_Y = Y_current
|
||||
sigma_Y_previous = sigma_Y_current
|
||||
|
||||
return False, None, None
|
||||
|
||||
def print_results(self):
|
||||
"""
|
||||
打印所有轮次的平均分和标准差。
|
||||
"""
|
||||
self.avg_scores, self.stds = self.calculate_avg_and_std()
|
||||
for i, (avg_score, std) in enumerate(zip(self.avg_scores, self.stds), 1):
|
||||
logger.info(f"轮次 {i}: 平均分 = {avg_score:.4f}, 标准差 = {std:.4f}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
# 使用该类,并指定 top_k
|
||||
checker = ConvergenceUtils("path") # 例如设置 top_k=5
|
||||
converged, convergence_round, final_round = checker.check_convergence()
|
||||
|
||||
if converged:
|
||||
logger.info(f"检测到收敛,发生在第 {convergence_round} 轮,最终轮次为 {final_round} 轮")
|
||||
else:
|
||||
logger.info("在所有轮次内未检测到收敛")
|
||||
|
||||
# 打印每轮的平均分和标准差
|
||||
checker.print_results()
|
||||
|
|
@ -1,157 +0,0 @@
|
|||
import json
|
||||
import os
|
||||
import random
|
||||
import datetime
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from metagpt.logs import logger
|
||||
|
||||
|
||||
class DataUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
self.top_scores = []
|
||||
|
||||
def load_results(self, path: str) -> list:
|
||||
result_path = os.path.join(path, "results.json")
|
||||
if os.path.exists(result_path):
|
||||
with open(result_path, 'r') as json_file:
|
||||
try:
|
||||
return json.load(json_file)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
return []
|
||||
|
||||
def get_top_rounds(self, sample: int, path=None, mode="Graph"):
|
||||
self._load_scores(path, mode)
|
||||
unique_rounds = set()
|
||||
unique_top_scores = []
|
||||
|
||||
first_round = next((item for item in self.top_scores if item["round"] == 1), None)
|
||||
if first_round:
|
||||
unique_top_scores.append(first_round)
|
||||
unique_rounds.add(1)
|
||||
|
||||
for item in self.top_scores:
|
||||
if item["round"] not in unique_rounds:
|
||||
unique_top_scores.append(item)
|
||||
unique_rounds.add(item["round"])
|
||||
|
||||
if len(unique_top_scores) >= sample:
|
||||
break
|
||||
|
||||
return unique_top_scores
|
||||
|
||||
def select_round(self, items):
|
||||
if not items:
|
||||
raise ValueError("Item list is empty.")
|
||||
|
||||
sorted_items = sorted(items, key=lambda x: x["score"], reverse=True)
|
||||
scores = [item["score"] * 100 for item in sorted_items]
|
||||
|
||||
probabilities = self._compute_probabilities(scores)
|
||||
logger.info("\nMixed probability distribution: ", probabilities)
|
||||
|
||||
selected_index = np.random.choice(len(sorted_items), p=probabilities)
|
||||
logger.info(f"\nSelected index: {selected_index}, Selected item: {sorted_items[selected_index]}")
|
||||
|
||||
return sorted_items[selected_index]
|
||||
|
||||
def _compute_probabilities(self, scores, alpha=0.2, lambda_=0.3):
|
||||
scores = np.array(scores, dtype=np.float64)
|
||||
n = len(scores)
|
||||
|
||||
if n == 0:
|
||||
raise ValueError("Score list is empty.")
|
||||
|
||||
uniform_prob = np.full(n, 1.0 / n, dtype=np.float64)
|
||||
|
||||
max_score = np.max(scores)
|
||||
shifted_scores = scores - max_score
|
||||
exp_weights = np.exp(alpha * shifted_scores)
|
||||
|
||||
sum_exp_weights = np.sum(exp_weights)
|
||||
if sum_exp_weights == 0:
|
||||
raise ValueError("Sum of exponential weights is 0, cannot normalize.")
|
||||
|
||||
score_prob = exp_weights / sum_exp_weights
|
||||
|
||||
mixed_prob = lambda_ * uniform_prob + (1 - lambda_) * score_prob
|
||||
|
||||
total_prob = np.sum(mixed_prob)
|
||||
if not np.isclose(total_prob, 1.0):
|
||||
mixed_prob = mixed_prob / total_prob
|
||||
|
||||
return mixed_prob
|
||||
|
||||
def load_log(self, cur_round, path=None, mode: str = "Graph"):
|
||||
if mode == "Graph":
|
||||
log_dir = os.path.join(self.root_path, "workflows", f"round_{cur_round}", "log.json")
|
||||
else:
|
||||
log_dir = path
|
||||
|
||||
# 检查文件是否存在
|
||||
if not os.path.exists(log_dir):
|
||||
return "" # 如果文件不存在,返回空字符串
|
||||
logger.info(log_dir)
|
||||
with open(log_dir, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if isinstance(data, dict):
|
||||
data = [data]
|
||||
elif not isinstance(data, list):
|
||||
data = list(data)
|
||||
|
||||
if not data:
|
||||
return ""
|
||||
|
||||
sample_size = min(3, len(data))
|
||||
random_samples = random.sample(data, sample_size)
|
||||
|
||||
log = ""
|
||||
for sample in random_samples:
|
||||
log += json.dumps(sample, indent=4, ensure_ascii=False) + "\n\n"
|
||||
|
||||
return log
|
||||
|
||||
def get_results_file_path(self, graph_path: str) -> str:
|
||||
return os.path.join(graph_path, "results.json")
|
||||
|
||||
def create_result_data(self, round: int, score: float, avg_cost: float, total_cost: float) -> dict:
|
||||
now = datetime.datetime.now()
|
||||
return {
|
||||
"round": round,
|
||||
"score": score,
|
||||
"avg_cost": avg_cost,
|
||||
"total_cost": total_cost,
|
||||
"time": now
|
||||
}
|
||||
|
||||
def save_results(self, json_file_path: str, data: list):
|
||||
with open(json_file_path, 'w') as json_file:
|
||||
json.dump(data, json_file, default=str, indent=4)
|
||||
|
||||
def _load_scores(self, path=None, mode="Graph"):
|
||||
if mode == "Graph":
|
||||
rounds_dir = os.path.join(self.root_path, "workflows")
|
||||
else:
|
||||
rounds_dir = path
|
||||
|
||||
result_file = os.path.join(rounds_dir, "results.json")
|
||||
self.top_scores = []
|
||||
|
||||
with open(result_file, 'r', encoding='utf-8') as file:
|
||||
data = json.load(file)
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
scores_per_round = df.groupby('round')['score'].mean().to_dict()
|
||||
|
||||
for round_number, average_score in scores_per_round.items():
|
||||
self.top_scores.append({
|
||||
"round": round_number,
|
||||
"score": average_score
|
||||
})
|
||||
|
||||
self.top_scores.sort(key=lambda x: x["score"], reverse=True)
|
||||
|
||||
return self.top_scores
|
||||
|
|
@ -1,57 +0,0 @@
|
|||
from examples.aflow.scripts.evaluator import Evaluator
|
||||
|
||||
|
||||
class EvaluationUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
|
||||
async def evaluate_initial_round(self, optimizer, graph_path, directory, validation_n, data):
|
||||
# 使用 optimizer 的 graph_utils 来加载图
|
||||
optimizer.graph = optimizer.graph_utils.load_graph(optimizer.round, graph_path)
|
||||
evaluator = Evaluator(eval_path=directory)
|
||||
|
||||
for i in range(validation_n):
|
||||
score, avg_cost, total_cost = await evaluator.graph_evaluate(
|
||||
optimizer.dataset, optimizer.graph,
|
||||
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
|
||||
directory, is_test=False
|
||||
)
|
||||
|
||||
new_data = optimizer.data_utils.create_result_data(optimizer.round, score, avg_cost, total_cost)
|
||||
data.append(new_data)
|
||||
|
||||
result_path = optimizer.data_utils.get_results_file_path(graph_path)
|
||||
optimizer.data_utils.save_results(result_path, data)
|
||||
|
||||
return data
|
||||
|
||||
async def evaluate_graph(self, optimizer, directory, validation_n, data, initial=False):
|
||||
evaluator = Evaluator(eval_path=directory)
|
||||
sum_score = 0
|
||||
|
||||
for i in range(validation_n):
|
||||
score, avg_cost, total_cost = await evaluator.graph_evaluate(
|
||||
optimizer.dataset, optimizer.graph,
|
||||
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
|
||||
directory, is_test=False
|
||||
)
|
||||
|
||||
cur_round = optimizer.round + 1 if initial is False else optimizer.round
|
||||
|
||||
new_data = optimizer.data_utils.create_result_data(cur_round, score, avg_cost, total_cost)
|
||||
data.append(new_data)
|
||||
|
||||
result_path = optimizer.data_utils.get_results_file_path(f"{optimizer.root_path}/workflows")
|
||||
optimizer.data_utils.save_results(result_path, data)
|
||||
|
||||
sum_score += score
|
||||
|
||||
return sum_score / validation_n
|
||||
|
||||
async def evaluate_graph_test(self, optimizer, directory, is_test=True):
|
||||
evaluator = Evaluator(eval_path=directory)
|
||||
return await evaluator.graph_evaluate(
|
||||
optimizer.dataset, optimizer.graph,
|
||||
{"dataset": optimizer.dataset, "llm_config": optimizer.execute_llm_config},
|
||||
directory, is_test=is_test
|
||||
)
|
||||
|
|
@ -1,95 +0,0 @@
|
|||
import json
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from metagpt.logs import logger
|
||||
|
||||
class ExperienceUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
|
||||
def load_experience(self, path=None, mode: str = "Graph"):
|
||||
if mode == "Graph":
|
||||
rounds_dir = os.path.join(self.root_path, "workflows")
|
||||
else:
|
||||
rounds_dir = path
|
||||
|
||||
experience_data = defaultdict(lambda: {"score": None, "success": {}, "failure": {}})
|
||||
|
||||
for round_dir in os.listdir(rounds_dir):
|
||||
if os.path.isdir(os.path.join(rounds_dir, round_dir)) and round_dir.startswith("round_"):
|
||||
round_path = os.path.join(rounds_dir, round_dir)
|
||||
try:
|
||||
round_number = int(round_dir.split("_")[1])
|
||||
json_file_path = os.path.join(round_path, "experience.json")
|
||||
if os.path.exists(json_file_path):
|
||||
with open(json_file_path, "r", encoding="utf-8") as json_file:
|
||||
data = json.load(json_file)
|
||||
father_node = data["father node"]
|
||||
|
||||
if experience_data[father_node]["score"] is None:
|
||||
experience_data[father_node]["score"] = data["before"]
|
||||
|
||||
if data["succeed"]:
|
||||
experience_data[father_node]["success"][round_number] = {
|
||||
"modification": data["modification"],
|
||||
"score": data["after"]
|
||||
}
|
||||
else:
|
||||
experience_data[father_node]["failure"][round_number] = {
|
||||
"modification": data["modification"],
|
||||
"score": data["after"]
|
||||
}
|
||||
except Exception as e:
|
||||
logger.info(f"Error processing {round_dir}: {str(e)}")
|
||||
|
||||
experience_data = dict(experience_data)
|
||||
|
||||
output_path = os.path.join(rounds_dir, "processed_experience.json")
|
||||
with open(output_path, "w", encoding="utf-8") as outfile:
|
||||
json.dump(experience_data, outfile, indent=4, ensure_ascii=False)
|
||||
|
||||
logger.info(f"Processed experience data saved to {output_path}")
|
||||
return experience_data
|
||||
|
||||
def format_experience(self, processed_experience, sample_round):
|
||||
experience_data = processed_experience.get(sample_round)
|
||||
if experience_data:
|
||||
experience = f"Original Score: {experience_data['score']}\n"
|
||||
experience += "These are some conclusions drawn from experience:\n\n"
|
||||
for key, value in experience_data["failure"].items():
|
||||
experience += f"-Absolutely prohibit {value['modification']} (Score: {value['score']})\n"
|
||||
for key, value in experience_data["success"].items():
|
||||
experience += f"-Absolutely prohibit {value['modification']} \n"
|
||||
experience += "\n\nNote: Take into account past failures and avoid repeating the same mistakes, as these failures indicate that these approaches are ineffective. You must fundamentally change your way of thinking, rather than simply using more advanced Python syntax like for, if, else, etc., or modifying the prompt."
|
||||
else:
|
||||
experience = f"No experience data found for round {sample_round}."
|
||||
return experience
|
||||
|
||||
def check_modification(self, processed_experience, modification, sample_round):
|
||||
experience_data = processed_experience.get(sample_round)
|
||||
if experience_data:
|
||||
for key, value in experience_data["failure"].items():
|
||||
if value['modification'] == modification:
|
||||
return False
|
||||
for key, value in experience_data["success"].items():
|
||||
if value['modification'] == modification:
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
return True # 如果 experience_data 为空,也返回 True
|
||||
|
||||
def create_experience_data(self, sample, modification):
|
||||
return {
|
||||
"father node": sample["round"],
|
||||
"modification": modification,
|
||||
"before": sample["score"],
|
||||
"after": None,
|
||||
"succeed": None,
|
||||
}
|
||||
|
||||
def update_experience(self, directory, experience, avg_score):
|
||||
experience["after"] = avg_score
|
||||
experience["succeed"] = bool(avg_score > experience["before"])
|
||||
|
||||
with open(os.path.join(directory, "experience.json"), "w", encoding="utf-8") as file:
|
||||
json.dump(experience, file, ensure_ascii=False, indent=4)
|
||||
|
|
@ -1,112 +0,0 @@
|
|||
import os
|
||||
import re
|
||||
import json
|
||||
from typing import List
|
||||
import traceback
|
||||
import time
|
||||
from metagpt.logs import logger
|
||||
|
||||
from examples.aflow.scripts.prompts.optimize_prompt import (
|
||||
WORKFLOW_CUSTOM_USE,
|
||||
WORKFLOW_INPUT,
|
||||
WORKFLOW_OPTIMIZE_PROMPT,
|
||||
WORKFLOW_TEMPLATE
|
||||
)
|
||||
|
||||
|
||||
class GraphUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
|
||||
def create_round_directory(self, graph_path: str, round_number: int) -> str:
|
||||
directory = os.path.join(graph_path, f"round_{round_number}")
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
return directory
|
||||
|
||||
def load_graph(self, round_number: int, workflows_path: str):
|
||||
workflows_path = workflows_path.replace("\\", ".").replace("/", ".")
|
||||
graph_module_name = f"{workflows_path}.round_{round_number}.graph"
|
||||
|
||||
try:
|
||||
graph_module = __import__(graph_module_name, fromlist=[""])
|
||||
graph_class = getattr(graph_module, "Workflow")
|
||||
return graph_class
|
||||
except ImportError as e:
|
||||
logger.info(f"Error loading graph for round {round_number}: {e}")
|
||||
raise
|
||||
|
||||
def read_graph_files(self, round_number: int, workflows_path: str):
|
||||
prompt_file_path = os.path.join(workflows_path, f"round_{round_number}", "prompt.py")
|
||||
graph_file_path = os.path.join(workflows_path, f"round_{round_number}", "graph.py")
|
||||
|
||||
try:
|
||||
with open(prompt_file_path, "r", encoding="utf-8") as file:
|
||||
prompt_content = file.read()
|
||||
with open(graph_file_path, "r", encoding="utf-8") as file:
|
||||
graph_content = file.read()
|
||||
except FileNotFoundError as e:
|
||||
logger.info(f"Error: File not found for round {round_number}: {e}")
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.info(f"Error loading prompt for round {round_number}: {e}")
|
||||
raise
|
||||
return prompt_content, graph_content
|
||||
|
||||
def extract_solve_graph(self, graph_load: str) -> List[str]:
|
||||
pattern = r"class Workflow:.+"
|
||||
return re.findall(pattern, graph_load, re.DOTALL)
|
||||
|
||||
def load_operators_description(self, operators: List[str]) -> str:
|
||||
path = f"{self.root_path}/workflows/template/operator.json"
|
||||
operators_description = ""
|
||||
for id, operator in enumerate(operators):
|
||||
operator_description = self._load_operator_description(id + 1, operator, path)
|
||||
operators_description += f"{operator_description}\n"
|
||||
return operators_description
|
||||
|
||||
def _load_operator_description(self, id: int, operator_name: str, file_path: str) -> str:
|
||||
with open(file_path, "r") as f:
|
||||
operator_data = json.load(f)
|
||||
matched_data = operator_data[operator_name]
|
||||
desc = matched_data["description"]
|
||||
interface = matched_data["interface"]
|
||||
return f"{id}. {operator_name}: {desc}, with interface {interface})."
|
||||
|
||||
def create_graph_optimize_prompt(self, experience: str, score: float, graph: str, prompt: str,
|
||||
operator_description: str, type: str, log_data: str) -> str:
|
||||
graph_input = WORKFLOW_INPUT.format(
|
||||
experience=experience, score=score, graph=graph, prompt=prompt, operator_description=operator_description,
|
||||
type=type, log=log_data
|
||||
)
|
||||
graph_system = WORKFLOW_OPTIMIZE_PROMPT.format(type=type)
|
||||
return graph_input + WORKFLOW_CUSTOM_USE + graph_system
|
||||
|
||||
async def get_graph_optimize_response(self, graph_optimize_node):
|
||||
max_retries = 5
|
||||
retries = 0
|
||||
|
||||
while retries < max_retries:
|
||||
try:
|
||||
response = graph_optimize_node.instruct_content.model_dump()
|
||||
return response
|
||||
except Exception as e:
|
||||
retries += 1
|
||||
logger.info(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
|
||||
if retries == max_retries:
|
||||
logger.info("Maximum retries reached. Skipping this sample.")
|
||||
break
|
||||
traceback.print_exc()
|
||||
time.sleep(5)
|
||||
return None
|
||||
|
||||
def write_graph_files(self, directory: str, response: dict, round_number: int, dataset: str):
|
||||
graph = WORKFLOW_TEMPLATE.format(graph=response["graph"], round=round_number, dataset=dataset)
|
||||
|
||||
with open(os.path.join(directory, "graph.py"), "w", encoding="utf-8") as file:
|
||||
file.write(graph)
|
||||
|
||||
with open(os.path.join(directory, "prompt.py"), "w", encoding="utf-8") as file:
|
||||
file.write(response["prompt"])
|
||||
|
||||
with open(os.path.join(directory, "__init__.py"), "w", encoding="utf-8") as file:
|
||||
file.write("")
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
WORKFLOW_OPTIMIZE_PROMPT = """You are building a Graph and corresponding Prompt to jointly solve {type} problems.
|
||||
Referring to the given graph and prompt, which forms a basic example of a {type} solution approach,
|
||||
please reconstruct and optimize them. You can add, modify, or delete nodes, parameters, or prompts. Include your
|
||||
single modification in XML tags in your reply. Ensure they are complete and correct to avoid runtime failures. When
|
||||
optimizing, you can incorporate critical thinking methods like review, revise, ensemble (generating multiple answers through different/similar prompts, then voting/integrating/checking the majority to obtain a final answer), selfAsk, etc. Consider
|
||||
Python's loops (for, while, list comprehensions), conditional statements (if-elif-else, ternary operators),
|
||||
or machine learning techniques (e.g., linear regression, decision trees, neural networks, clustering). The graph
|
||||
complexity should not exceed 10. Use logical and control flow (IF-ELSE, loops) for a more enhanced graphical
|
||||
representation.Ensure that all the prompts required by the current graph from prompt_custom are included.Exclude any other prompts.
|
||||
Output the modified graph and all the necessary Prompts in prompt_custom (if needed).
|
||||
The prompt you need to generate is only the one used in `prompt_custom.XXX` within Custom. Other methods already have built-in prompts and are prohibited from being generated. Only generate those needed for use in `prompt_custom`; please remove any unused prompts in prompt_custom.
|
||||
the generated prompt must not contain any placeholders.
|
||||
Considering information loss, complex graphs may yield better results, but insufficient information transmission can omit the solution. It's crucial to include necessary context during the process."""
|
||||
|
||||
|
||||
WORKFLOW_INPUT = """
|
||||
Here is a graph and the corresponding prompt (prompt only related to the custom method) that performed excellently in a previous iteration (maximum score is 1). You must make further optimizations and improvements based on this graph. The modified graph must differ from the provided example, and the specific differences should be noted within the <modification>xxx</modification> section.\n
|
||||
<sample>
|
||||
<experience>{experience}</experience>
|
||||
<modification>(such as:add a review step/delete a operator/modify a prompt)</modification>
|
||||
<score>{score}</score>
|
||||
<graph>{graph}</graph>
|
||||
<prompt>{prompt}</prompt>(only prompt_custom)
|
||||
<operator_description>{operator_description}</operator_description>
|
||||
</sample>
|
||||
Below are the logs of some results with the aforementioned Graph that performed well but encountered errors, which can be used as references for optimization:
|
||||
{log}
|
||||
|
||||
First, provide optimization ideas. **Only one detail point can be modified at a time**, and no more than 5 lines of code may be changed per modification—extensive modifications are strictly prohibited to maintain project focus!
|
||||
When introducing new functionalities in the graph, please make sure to import the necessary libraries or modules yourself, except for operator, prompt_custom, create_llm_instance, and CostManage, which have already been automatically imported.
|
||||
**Under no circumstances should Graph output None for any field.**
|
||||
Use custom methods to restrict your output format, rather than using code (outside of the code, the system will extract answers based on certain rules and score them).
|
||||
It is very important to format the Graph output answers, you can refer to the standard answer format in the log.
|
||||
"""
|
||||
|
||||
WORKFLOW_CUSTOM_USE = """\nHere's an example of using the `custom` method in graph:
|
||||
```
|
||||
# You can write your own prompt in <prompt>prompt_custom</prompt> and then use it in the Custom method in the graph
|
||||
response = await self.custom(input=problem, instruction=prompt_custom.XXX_PROMPT)
|
||||
# You can also concatenate previously generated string results in the input to provide more comprehensive contextual information.
|
||||
# response = await self.custom(input=problem+f"xxx:{xxx}, xxx:{xxx}", instruction=prompt_custom.XXX_PROMPT)
|
||||
# The output from the Custom method can be placed anywhere you need it, as shown in the example below
|
||||
solution = await self.generate(problem=f"question:{problem}, xxx:{response['response']}")
|
||||
```
|
||||
Note: In custom, the input and instruction are directly concatenated(instruction+input), and placeholders are not supported. Please ensure to add comments and handle the concatenation externally.\n
|
||||
|
||||
**Introducing multiple operators at appropriate points can enhance performance. If you find that some provided operators are not yet used in the graph, try incorporating them.**
|
||||
"""
|
||||
|
||||
WORKFLOW_TEMPLATE = """from typing import Literal
|
||||
import examples.aflow.scripts.optimized.{dataset}.workflows.template.operator as operator
|
||||
import examples.aflow.scripts.optimized.{dataset}.workflows.round_{round}.prompt as prompt_custom
|
||||
from metagpt.provider.llm_provider_registry import create_llm_instance
|
||||
from metagpt.utils.cost_manager import CostManager
|
||||
|
||||
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
|
||||
|
||||
{graph}
|
||||
"""
|
||||
|
|
@ -1,89 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 6/26/2024 17:07 PM
|
||||
# @Author : didi
|
||||
# @Desc : prompts of operators
|
||||
|
||||
ANSWER_GENERATION_PROMPT = """
|
||||
Think step by step and solve the problem.
|
||||
1. In the "thought" field, explain your thinking process in detail.
|
||||
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
|
||||
Your task: {input}
|
||||
"""
|
||||
|
||||
FORMAT_PROMPT = """
|
||||
For the question described as {problem_description},
|
||||
please extract a short and concise answer contains only one word/few words from the following solution: {solution}.
|
||||
Make sure there are no additional comments or explanations in your response.
|
||||
"""
|
||||
|
||||
SC_ENSEMBLE_PROMPT = """
|
||||
Given the question described as follows: {question}
|
||||
Several solutions have been generated to address the given question. They are as follows:
|
||||
{solutions}
|
||||
|
||||
Carefully evaluate these solutions and identify the answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
|
||||
|
||||
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
|
||||
"""
|
||||
|
||||
PYTHON_CODE_VERIFIER_PROMPT = """
|
||||
You are a professional Python programmer. Your task is to write complete, self-contained code based on a given mathematical problem and output the answer. The code should include all necessary imports and dependencies, and be ready to run without additional setup or environment configuration.
|
||||
|
||||
Problem description: {problem}
|
||||
Other analysis: {analysis}
|
||||
{feedback}
|
||||
|
||||
Your code should:
|
||||
1. Implement the calculation steps described in the problem.
|
||||
2. Define a function named `solve` that performs the calculation and returns the result. The `solve` function should not require any input parameters; instead, it should obtain all necessary inputs from within the function or from globally defined variables.
|
||||
3. `solve` function return the final calculation result.
|
||||
|
||||
Please ensure your code is efficient, well-commented, and follows Python best practices. The output should be limited to basic data types such as strings, integers, and floats. It is prohibited to transmit images or other file formats. The code output is intended for a text-based language model.
|
||||
"""
|
||||
|
||||
|
||||
REFLECTION_ON_PUBLIC_TEST_PROMPT = """
|
||||
Given a code problem and a python code solution which failed to pass test or execute, you need to analyze the reason for the failure and propose a better code solution.:
|
||||
### problem
|
||||
{problem}
|
||||
|
||||
### Code Solution
|
||||
{solution}
|
||||
|
||||
### Execution Result
|
||||
{exec_pass}
|
||||
|
||||
#### Failed Test Case
|
||||
{test_fail}
|
||||
|
||||
Please provide a reflection on the failed test cases and code solution, followed by a better code solution without any additional text or test cases.
|
||||
"""
|
||||
|
||||
MD_ENSEMBLE_PROMPT = """
|
||||
Given the question described as follows: {question}
|
||||
Several solutions have been generated to address the given question. They are as follows:
|
||||
{solutions}
|
||||
|
||||
Carefully evaluate these solutions and identify the solution that is more capable of solving the problem compared to other solutions, as this is crucial for problem-solving.
|
||||
|
||||
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the solution. Do not include any additional text or explanation in the "solution_letter" field.
|
||||
"""
|
||||
|
||||
REVIEW_PROMPT = """
|
||||
Given a problem and a thoughtful solution, your task is to using critical thinking (questioning) to review the solution's correctness and provide a review result in boolean format.
|
||||
|
||||
problem: {problem}
|
||||
solution: {solution}
|
||||
|
||||
If you are more than 95 percent confident that the final answer is incorrect, please return False and give a feedback for the error. Otherwise, please return True and give a explanation for the correctness.
|
||||
"""
|
||||
|
||||
REVISE_PROMPT = """
|
||||
Given a problem and a thoughtful solution which is just reviewed as incorrect, your task is to revise the solution to solve the question and ensure the final code solution is wrapped with ```python```.
|
||||
|
||||
problem: {problem}
|
||||
solution: {solution}
|
||||
feedback: {feedback}
|
||||
|
||||
Ensure the output code is self-contained, and without any additional text or test cases.
|
||||
"""
|
||||
|
|
@ -1,164 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 7/2/2024 17:36 PM
|
||||
# @Author : didi
|
||||
# @Desc : utils for experiment
|
||||
|
||||
import ast
|
||||
import json
|
||||
import re
|
||||
from typing import Any, List, Tuple
|
||||
|
||||
def extract_task_id(task_id: str) -> int:
|
||||
"""Extract the numeric part of the task_id."""
|
||||
match = re.search(r"/(\d+)", task_id)
|
||||
return int(match.group(1)) if match else 0
|
||||
|
||||
|
||||
def get_hotpotqa(path: str):
|
||||
# Parses each jsonl line and yields it as a dictionary
|
||||
def parse_jsonl(path):
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
yield json.loads(line)
|
||||
|
||||
datas = list(parse_jsonl(path))
|
||||
return {data["_id"]: data for data in datas}
|
||||
|
||||
|
||||
def sort_json_by_key(input_file: str, output_file: str, key: str = "task_id"):
|
||||
"""
|
||||
Read a JSONL file, sort the entries based on task_id, and write to a new JSONL file.
|
||||
|
||||
:param input_file: Path to the input JSONL file
|
||||
:param output_file: Path to the output JSONL file
|
||||
"""
|
||||
# Read and parse the JSONL file
|
||||
with open(input_file, "r") as f:
|
||||
data = [json.loads(line) for line in f]
|
||||
|
||||
# Sort the data based on the numeric part of task_id
|
||||
sorted_data = sorted(data, key=lambda x: extract_task_id(x[key]))
|
||||
|
||||
# Write the sorted data to a new JSONL file
|
||||
with open(output_file, "w") as f:
|
||||
for item in sorted_data:
|
||||
f.write(json.dumps(item) + "\n")
|
||||
|
||||
|
||||
def parse_python_literal(s):
|
||||
try:
|
||||
return ast.literal_eval(s)
|
||||
except (ValueError, SyntaxError):
|
||||
return s
|
||||
|
||||
|
||||
def extract_test_cases_from_jsonl(
|
||||
entry_point: str, dataset: str = "HumanEval"
|
||||
):
|
||||
if dataset == "HumanEval":
|
||||
file_path = "examples/aflow/data/humaneval_public_test.jsonl"
|
||||
# Retain the original hardcoded test cases
|
||||
hardcoded_cases = {
|
||||
"find_zero": "",
|
||||
"decode_cyclic": "",
|
||||
"decode_shift": "",
|
||||
"by_length":"",
|
||||
"add":"",
|
||||
"triangle_area":"",
|
||||
"correct_bracketing":"",
|
||||
"solve":"",
|
||||
"sum_squares":"",
|
||||
"starts_one_ends":""
|
||||
}
|
||||
elif dataset == "MBPP":
|
||||
file_path = "examples/aflow/data/mbpp_public_test.jsonl"
|
||||
hardcoded_cases = {
|
||||
"remove_odd": "",
|
||||
"replace_spaces": "",
|
||||
"snake_to_camel": "",
|
||||
"Split": "",
|
||||
"swap_List": "",
|
||||
"square_Sum": "",
|
||||
"sort_sublists": "",
|
||||
"unique_sublists": ""
|
||||
}
|
||||
# Check if there are hardcoded test cases
|
||||
if entry_point in hardcoded_cases:
|
||||
return hardcoded_cases[entry_point]
|
||||
|
||||
# If there are no hardcoded test cases, read from the file
|
||||
with open(file_path, "r") as file:
|
||||
for line in file:
|
||||
data = json.loads(line)
|
||||
if data.get("entry_point") == entry_point:
|
||||
return data.get("test")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def extract_test_cases(docstring: str) -> List[Tuple[str, List[Any], Any]]:
|
||||
# Use regular expressions to match test cases, now capturing function names and any output
|
||||
pattern = r">>> (\w+)\((.*?)\)\n\s*(.*?)(?=\n|$)"
|
||||
matches = re.findall(pattern, docstring, re.DOTALL)
|
||||
|
||||
test_cases = []
|
||||
for match in matches:
|
||||
func_name, input_str, expected_output = match
|
||||
|
||||
# Process input
|
||||
input_list = []
|
||||
for item in input_str.split(","):
|
||||
item = item.strip()
|
||||
try:
|
||||
# Try to convert input to numeric type
|
||||
if "." in item:
|
||||
input_list.append(float(item))
|
||||
else:
|
||||
input_list.append(int(item))
|
||||
except ValueError:
|
||||
# If unable to convert to numeric, keep as string
|
||||
input_list.append(item.strip("'\""))
|
||||
|
||||
# Process output
|
||||
try:
|
||||
# Try to convert output to numeric or boolean value
|
||||
if expected_output.lower() == "true":
|
||||
expected_output = True
|
||||
elif expected_output.lower() == "false":
|
||||
expected_output = False
|
||||
elif "." in expected_output:
|
||||
expected_output = float(expected_output)
|
||||
else:
|
||||
expected_output = int(expected_output)
|
||||
except ValueError:
|
||||
# If unable to convert, keep as string
|
||||
expected_output = expected_output.strip("'\"")
|
||||
|
||||
test_cases.append([func_name, input_list, expected_output])
|
||||
|
||||
return test_cases
|
||||
|
||||
|
||||
def test_cases_2_test_functions(solution: str, test_cases: str):
|
||||
tester_function = f"""
|
||||
{solution}
|
||||
|
||||
{test_cases}
|
||||
"""
|
||||
return tester_function
|
||||
|
||||
|
||||
def test_case_2_test_function(solution: str, test_case: str, entry_point: str):
|
||||
tester_function = f"""
|
||||
{solution}
|
||||
|
||||
|
||||
def check(candidate):
|
||||
{test_case}
|
||||
|
||||
def test_check():
|
||||
check({entry_point})
|
||||
|
||||
test_check()
|
||||
"""
|
||||
return tester_function
|
||||
|
|
@ -1,32 +0,0 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 6/27/2024 22:07 PM
|
||||
# @Author : didi
|
||||
# @Desc : Basic Graph Class
|
||||
|
||||
from typing import Literal
|
||||
|
||||
from examples.aflow.scripts.operator import Generate
|
||||
from metagpt.provider.llm_provider_registry import create_llm_instance
|
||||
from metagpt.utils.cost_manager import CostManager
|
||||
|
||||
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
|
||||
|
||||
class Workflow:
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
llm_config,
|
||||
dataset: DatasetType,
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.dataset = dataset
|
||||
self.llm = create_llm_instance(llm_config)
|
||||
self.llm.cost_manager = CostManager()
|
||||
|
||||
async def __call__(self, problem: str):
|
||||
"""
|
||||
Implementation of the workflow
|
||||
"""
|
||||
raise NotImplementedError("This method should be implemented by the subclass")
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue