提交baseline例子;修改context-fill 格式识别方式

This commit is contained in:
didi 2024-09-09 17:17:15 +08:00
parent ca560a844f
commit 4e0a896bdc
13 changed files with 254 additions and 182 deletions

View file

@ -1,29 +1,110 @@
# -*- coding: utf-8 -*-
# @Date :
# @Author : issac
# @Author : all
# @Desc : test on gsm8k
import re
import json
import asyncio
import aiofiles
import pandas as pd
from typing import Optional, List, Tuple, Callable
from tqdm.asyncio import tqdm_asyncio
from deepeval.models.base_model import DeepEvalBaseLLM
from examples.ags.benchmark.utils import generate_random_indices
def extract_number(text: str) -> Optional[float]:
"""清理文本并提取单个数字"""
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", text)
if matches:
last_number = matches[-1].replace(",", "")
try:
return float(last_number)
except ValueError:
return None
else:
return None
def loose_match_score(expected_output: str, prediction: str, tolerance: float = 1e-6) -> int:
"""宽松匹配分数计算函数"""
expected_number = extract_number(expected_output)
predicted_number = extract_number(prediction)
if expected_number is None or predicted_number is None:
return 0
if abs(expected_number - predicted_number) <= tolerance:
return 1
else:
return 0
# 这里是DeepEval强制定义的模型基础格式这里不需要进行改动只需要调用即可
class GraphModel(DeepEvalBaseLLM):
def __init__(self, graph):
self.solver = graph
async def load_data(file_path: str, samples=1) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path: str) -> float:
"""保存结果到CSV文件"""
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
average_score = df["score"].mean()
def load_model(self):
pass
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
async def a_generate(self, prompt: str) -> str:
# TODO 还需要在这里继续整合Cost
solution_result, total_cost = await self.solver(prompt)
return solution_result
async def evaluate_problem(input: str, graph: Callable, expected_output: str) -> Tuple[str, str, str, int, str]:
"""评估单个问题"""
prompt = input
max_retries = 5
retries = 0
def generate(self, prompt: str) -> str:
loop = asyncio.get_event_loop()
solution_result = loop.run_until_complete(self.a_generate(prompt)) # 等待 a_generate 方法完成
return solution_result
while retries < max_retries:
try:
prediction = await graph(prompt)
cost = prediction[1]
output = prediction[0]["solution"]
def get_model_name(self):
return "Custom Azure OpenAI Model"
score = loose_match_score(expected_output, output)
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = None
cost = None
score = 0
break
return input, output, expected_output, score, cost
async def evaluate_all_problems(data: List[dict], graph: Callable, max_concurrent_tasks: int = 20) -> List[Tuple[str, str, str, int, str]]:
"""评估所有问题"""
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
input_text = problem["question"]
expected_output = problem["answer"]
return await evaluate_problem(input_text, graph, expected_output)
tasks = [sem_evaluate(problem) for problem in data]
return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data))
async def gsm8k_evaluation(graph: Callable, file_path: str, samples: int, path: str) -> float:
"""GSM8K评估主函数"""
data = await load_data(file_path, samples)
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=5)
print(results)
average_score = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
return average_score

View file

@ -0,0 +1,17 @@
import numpy as np
def generate_random_indices(n, n_samples, test=False):
"""
生成随机索引
"""
def _set_seed(seed=42):
np.random.seed(seed)
_set_seed()
indices = np.arange(n)
np.random.shuffle(indices)
if test:
return indices[n_samples:]
else:
return indices[:n_samples]

View file

@ -0,0 +1,73 @@
from examples.ags.scripts.operator import Operator
from examples.ags.scripts.graph import SolveGraph
from examples.ags.benchmark.gsm8k import gsm8k_evaluation
from examples.ags.scripts.operator_an import GenerateOp
from metagpt.actions.action_node import ActionNode
from metagpt.configs.models_config import ModelsConfig
from metagpt.llm import LLM
from pydantic import BaseModel, Field
from typing import Dict, Any
GSM8K_PROMPT_GPT = """
{question}\nPlease reason step by step, and put your final answer in the end. Wrap content using xml tags.
"""
GSM8K_PROMPT_DS = """
{question}\nPlease reason step by step, and put your final answer within \\boxed{{}}.
"""
class GenerateOp(BaseModel):
solution: str = Field(default="", description="solution for the problem")
class CoTGenerate(Operator):
def __init__(self, llm: LLM, name: str = "Generate"):
super().__init__(name, llm)
async def __call__(self, problem, mode: str = None):
prompt = GSM8K_PROMPT_GPT.format(question=problem)
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(GenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class CoTSolveGraph(SolveGraph):
def __init__(self, name: str, llm_config, dataset: str):
super().__init__(name, llm_config, dataset)
self.cot_generate = CoTGenerate(self.llm)
async def __call__(self, problem):
solution = await self.cot_generate(problem, mode="context_fill")
return solution, self.llm.cost_manager.total_cost
if __name__ == "__main__":
async def main():
# llm_config = ModelsConfig.default().get("deepseek-coder")
# llm_config = ModelsConfig.default().get("gpt-4o-mini")
llm_config = ModelsConfig.default().get("gpt-35-turbo-1106")
graph = CoTSolveGraph(name="CoT", llm_config=llm_config, dataset="Gsm8K")
file_path = "examples/ags/data/gsm8k.jsonl"
samples = 1055
# samples = 100
path = "examples/ags/data/baselines/general"
score = await gsm8k_evaluation(graph, file_path, samples, path)
return score
import asyncio
asyncio.run(main())
# self consistency operator; universal self consistency;
# IO指的没有任何Trick看LLM自身的一个效果。使用 model 发布者在对应的 dataset 使用的 prompt。
# deepseek-chat; gpt-4o-mini; gpt-35-turbo-1106
GENERATE_PROMPT = """
Generate Solution for the following problem: {problem_description}
"""
# med ensemble

View file

View file

@ -20,6 +20,9 @@ from sympy.parsing.latex import parse_latex
from sympy.parsing.sympy_parser import parse_expr
from tqdm.asyncio import tqdm_asyncio
from examples.ags.benchmark.gsm8k import gsm8k_evaluation
from examples.ags.benchmark.utils import generate_random_indices
DatasetType = Literal["HumanEval", "MBPP", "Gsm8K", "MATH", "HotpotQA", "DROP"]
@ -31,22 +34,6 @@ class Evaluator:
def __init__(self, eval_path: str):
self.eval_path = eval_path
def _generate_random_indices(self, n, n_samples, test=False):
"""
生成随机索引
"""
def _set_seed(seed=42):
np.random.seed(seed)
_set_seed()
indices = np.arange(n)
np.random.shuffle(indices)
if test:
return indices[n_samples:]
else:
return indices[:n_samples]
def validation_evaluate(self, dataset: DatasetType, graph, params: dict, path):
"""
Evaluates on validation dataset.
@ -74,131 +61,16 @@ class Evaluator:
"""
Evaluate on GSM8K dataset.
"""
# 模拟加载模型的函数
async def load_graph():
dataset = params["dataset"]
llm_config = params["llm_config"]
return graph_class(name="Gsm8K", llm_config=llm_config, dataset=dataset)
graph = graph_class(name="Gsm8K", llm_config=llm_config, dataset=dataset)
return graph
# 清理文本并提取单个数字
def extract_number(text: str) -> Optional[float]:
# 使用正则表达式提取数字,包括整数和浮点数
matches = re.findall(r"[-+]?\d+(?:,\d{3})*(?:\.\d+)?|\d+\.\d+", text)
print(matches)
if matches:
# 获取最后一个匹配的数字
last_number = matches[-1]
# 移除逗号以统一格式
last_number = last_number.replace(",", "")
try:
return float(last_number)
except ValueError:
return None
else:
return None
# 宽松匹配分数计算函数
def loose_match_score(expected_output: str, prediction: str, tolerance: float = 1e-6) -> int:
expected_number = extract_number(expected_output)
predicted_number = extract_number(prediction)
print(predicted_number)
# 如果预期输出或预测输出为空,返回不匹配
if expected_number is None or predicted_number is None:
return 0
# 比较两个提取出的数字,允许一定的容差
if abs(expected_number - predicted_number) <= tolerance:
return 1 # 数字相近,认为匹配成功
else:
return 0 # 数字不匹配
# 异步评估单个问题
async def _evaluate_problem(input: str, graph, expected_output: str) -> Tuple[str, str, str, int, str]:
prompt = input
max_retries = 5
retries = 0
while retries < max_retries:
try:
# 假设模型有一个异步生成函数
prediction = await graph(prompt) if graph else "None" # 这是一个占位符,替换成实际的模型生成逻辑
cost = prediction[1]
output = prediction[0]["solution"]
score = loose_match_score(expected_output, prediction[0]["solution"])
break
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
output = None
cost = None
score = 0
break
return input, output, expected_output, score, cost
# 异步读取JSONL文件
async def load_data(file_path: str) -> List[dict]:
data = []
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
return data[:samples]
# 并行评估所有问题
async def evaluate_all_problems(data: List[dict], graph, max_concurrent_tasks: int = 300):
semaphore = asyncio.Semaphore(max_concurrent_tasks)
async def sem_evaluate(problem):
async with semaphore:
input_text = problem["question"]
expected_output = problem["answer"]
return await _evaluate_problem(input_text, graph, expected_output)
tasks = [sem_evaluate(problem) for problem in data]
# 使用tqdm.gather来显示进度条
return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data))
# 保存结果到CSV文件
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
average_score = df["score"].mean()
# 生成文件名,保留五位小数
output_file = f"{path}/{average_score:.5f}.csv"
df.to_csv(output_file, index=False)
print(f"Results saved to {output_file}")
return average_score
async def gsm8k():
file_path = "examples/ags/w_action_node/data/gsm8k.jsonl" # 替换为您的JSONL文件路径
data = await load_data(file_path)
graph = await load_graph()
results = await evaluate_all_problems(data, graph, max_concurrent_tasks=20)
# 保存结果到CSV文件并获取平均分
average_score = save_results_to_csv(results, path=path)
print(f"Average score: {average_score:.5f}")
return average_score
score = await gsm8k()
graph = await load_graph()
file_path = "examples/ags/data/gsm8k.jsonl"
score = await gsm8k_evaluation(graph, file_path, samples, path)
return score
async def _math_eval(self, graph_class, params, path, samples: int = 200):
@ -457,7 +329,7 @@ class Evaluator:
return await tqdm_asyncio.gather(*tasks, desc="Evaluating MATH problems", total=len(data))
def save_results_to_csv(results: List[Tuple[str, str, str, int]], path):
def save_results_to_csv(results: List[Tuple[str, str, str, int, str]], path):
df = pd.DataFrame(results, columns=["question", "prediction", "expected_output", "score", "cost"])
average_score = df["score"].mean()
@ -503,7 +375,7 @@ class Evaluator:
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = self._generate_random_indices(len(data), samples)
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
@ -656,9 +528,6 @@ class Evaluator:
normalized = " ".join(parts).strip()
return normalized
# def exact_match_score(prediction, ground_truth):
# return int(normalize_answer(prediction) == normalize_answer(ground_truth))
def answer_to_bags(answer: str) -> Set[str]:
raw_spans = [answer]
@ -725,7 +594,7 @@ class Evaluator:
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = self._generate_random_indices(len(data), samples)
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
@ -778,18 +647,6 @@ class Evaluator:
return await tqdm_asyncio.gather(*tasks, desc="Evaluating problems", total=len(data))
# def save_results_to_jsonl(results: List[Tuple[str, str, str, str, int]], path):
# avg_score = 0
# with open(path, "w") as f:
# for result in results:
# f.write(json.dumps({"question": result[0], "prediction": result[1], "expected_output": result[2], "supporting_sentences": result[3], "score": result[4]}) + "\n")
# avg_score += result[4]
# print(f"Results saved to {path}")
# avg_score /= len(results)
# return avg_score
def save_results_to_csv(results: List[Tuple[str, str, str, str, int]], path):
df = pd.DataFrame(
results, columns=["question", "prediction", "expected_output", "supporting_sentences", "score"]
@ -834,7 +691,7 @@ class Evaluator:
async with aiofiles.open(file_path, mode="r") as file:
async for line in file:
data.append(json.loads(line))
random_indices = self._generate_random_indices(len(data), samples)
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data
@ -1056,7 +913,7 @@ class Evaluator:
with open(file_path, mode="r") as file:
data = json.load(file)
data = list(data.items())
random_indices = self._generate_random_indices(len(data), samples)
random_indices = generate_random_indices(len(data), samples)
data = [data[i] for i in random_indices]
return data

View file

@ -14,6 +14,7 @@ DatasetType = Literal["HumanEval", "MBPP", "Gsm8K", "MATH", "HotpotQa", "MMLU"]
cost_manager = CostManager()
# TODO 这个类应该作为一个基类,不能够这样使用
class SolveGraph:
def __init__(
self,
@ -25,7 +26,7 @@ class SolveGraph:
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.generate = Generate()
self.generate = Generate(self.llm)
async def __call__(self, problem: str):
"""

View file

@ -322,3 +322,15 @@ MATH_ANSWER_FORMAT_PROMPT = """
### Instructions
Provide the answer as a numerical value only, without units or any additional text.
"""
PYTHON_CODE_SOLVER_PROMPT = """You are a professional Python programmer. Your task is to write Python code based on the user's request. Make sure to add appropriate explanations and your personal thought process to your code. Additionally, all code should be encapsulated in Python code blocks.
The packages you can use include: numpy, scipy, pandas, sympy, statsmodels, scikit-learn. If you attempt to import another external package and encounter an error, do not say it cannot be imported. Instead, try to write new code that avoids this issue.
Always output complete code rather than just giving suggestions or partial modifications, as your code will be executed directly. If immediate execution is required to check for possible errors, include test cases in the code.
In your response, only the code that needs to be run should be wrapped in multi-line code blocks. No other multi-line code blocks should appear. Your code needs to print the output after execution. Your code should not print error messages.
Problem description: {problem}
Please write Python code to solve this problem.
"""