Update llm.py & handle exception

This commit is contained in:
didi 2024-10-22 10:28:23 +08:00
parent 828d187681
commit 66b523953d
7 changed files with 155 additions and 134 deletions

View file

@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Tuple
import aiofiles
import pandas as pd
from tqdm.asyncio import tqdm_asyncio
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from examples.aflow.benchmark.benchmark import BaseBenchmark
@ -52,40 +53,41 @@ class DROPBenchmark(BaseBenchmark):
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = problem["context"]
expected_output = problem["ref_text"]
answers = expected_output.split("|")
max_retries = 5
retries = 0
try:
output, cost = await self._generate_output(graph, input_text)
f1_scores = []
while retries < max_retries:
try:
output, cost = await graph(input_text)
f1_scores = []
for answer in answers:
if answer.strip() != "":
output_parts = output.split("|")
for output_part in output_parts:
f1_score, _ = self.calculate_score(answer, output_part)
f1_scores.append(f1_score)
for answer in answers:
if answer.strip() != "":
output_parts = output.split("|")
for output_part in output_parts:
f1_score, _ = self.calculate_score(answer, output_part)
f1_scores.append(f1_score)
uni_score = max(f1_scores)
uni_score = max(f1_scores)
if uni_score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
if uni_score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
return input_text, output, expected_output, uni_score, cost
return input_text, output, expected_output, uni_score, cost
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return input_text, str(e), expected_output, 0.0, 0.0
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["inputs", "prediction", "expected_output", "score", "cost"]

View file

@ -14,6 +14,8 @@ from tqdm.asyncio import tqdm_asyncio
import os
import time
from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from examples.aflow.benchmark.benchmark import BaseBenchmark
@ -36,31 +38,33 @@ class GSM8KBenchmark(BaseBenchmark):
if prediction is None:
return 0.0, prediction
return 1.0 if abs(expected_output - prediction) <= 1e-6 else 0.0, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, float, float, float]:
max_retries = 5
retries = 0
while retries < max_retries:
try:
prediction, cost = await graph(problem["question"])
predicted_number = self.extract_number(prediction)
expected_output = self.extract_number(problem["answer"])
input_text = problem["question"]
expected_output = self.extract_number(problem["answer"])
score, _ = self.calculate_score(expected_output, predicted_number)
try:
output, cost = await self._generate_output(graph, input_text)
predicted_number = self.extract_number(output)
score, extracted_output = self.calculate_score(expected_output, predicted_number)
if score == 0:
self.log_mismatch(problem["question"], expected_output, prediction, predicted_number)
if score == 0:
self.log_mismatch(input_text, expected_output, output, extracted_output)
return problem["question"], prediction, expected_output, score, cost
return input_text, output, expected_output, score, cost
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return problem["question"], str(e), self.extract_number(problem["answer"]), 0.0, 0.0
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -7,6 +7,7 @@ import string
import re
import os
from collections import Counter
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from examples.aflow.benchmark.benchmark import BaseBenchmark
@ -42,6 +43,15 @@ class HotpotQABenchmark(BaseBenchmark):
f1 = (2 * precision * recall) / (precision + recall)
return f1, prediction
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, str, float, float]:
input_text = problem["question"]
expected_output = problem["answer"]
@ -49,26 +59,18 @@ class HotpotQABenchmark(BaseBenchmark):
context_str = "\n".join(" ".join(paragraph) for paragraph in paragraphs)
inputs = f"Context: {context_str}\n\nQuestion: {input_text}\n\nAnswer:"
max_retries = 5
retries = 0
try:
output, cost = await self._generate_output(graph, inputs)
score, extracted_output = self.calculate_score(expected_output, output)
while retries < max_retries:
try:
output, cost = await graph(inputs)
score, _ = self.calculate_score(expected_output, output)
if score < 0.3: # We set the threshold for collecting incorrect questions to 0.3, as F1 Score cannot be simply judged using 0-1
self.log_mismatch(input_text, expected_output, output, extracted_output)
if score < 0.3:
self.log_mismatch(input_text, expected_output, output, output)
return input_text, context_str, output, expected_output, score, cost
return input_text, context_str, output, expected_output, score, cost
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
return input_text, context_str, str(e), expected_output, 0.0, 0.0
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, context_str, str(e), expected_output, 0.0, 0.0
def get_result_columns(self) -> List[str]:
return ["question", "context", "prediction", "expected_output", "score", "cost"]

View file

@ -5,6 +5,8 @@ import asyncio
import threading
from datetime import datetime
from typing import List, Tuple, Callable, Dict, Any, Optional
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import pandas as pd
@ -93,44 +95,45 @@ class HumanEvalBenchmark(BaseBenchmark):
return result
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
max_retries = 5
retries = 0
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, prompt, entry_point):
# Generate output with a timeout of 60 seconds
return await asyncio.wait_for(graph(prompt, entry_point), timeout=60)
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = data["prompt"]
expected_output = "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
while retries < max_retries:
try:
prediction, cost = await asyncio.wait_for(graph(data["prompt"], data["entry_point"]), timeout=60)
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:\ndef " + data["entry_point"] + "(params you should put here):" + "\n\n" + data["canonical_solution"]
score = 1.0 if ret[0] == self.PASS else 0.0
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + expected_output
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0
if score == 0:
self.log_mismatch(data["prompt"], expected_output, prediction, score)
break
except asyncio.TimeoutError:
prediction = None
ret = (self.FAIL, ["Timeout"])
score = 0.0
cost = 0.0
break
# Log mismatch if the score is 0
if score == 0:
self.log_mismatch(input_text, expected_output, prediction, score)
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
return input_text, prediction, expected_output, score, cost
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
ret = (self.FAIL, [])
score = 0.0
cost = 0.0
break
except asyncio.TimeoutError:
print("Timeout error. Skipping this sample.")
return input_text, "Timeout", expected_output, 0.0, 0.0
return data["prompt"], prediction, expected_output, score, cost
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
# The scoring logic for HumanEval is already implemented in evaluate_problem, this is just to conform to the interface

View file

@ -6,6 +6,8 @@ from sympy.parsing.sympy_parser import parse_expr
from math import isclose
import multiprocessing
from typing import Any, Callable, Tuple, List
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from examples.aflow.benchmark.benchmark import BaseBenchmark
@ -95,22 +97,34 @@ class MATHBenchmark(BaseBenchmark):
pass
return False
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, input_text):
return await graph(input_text)
async def evaluate_problem(self, problem: dict, graph: Callable) -> Tuple[str, str, str, int, float]:
input_text = problem["problem"]
expected_output = problem["solution"]
max_retries = 2
retries = 0
prediction = await graph(input_text)
cost = prediction[1]
output = prediction[0]
try:
output, cost = await self._generate_output(graph, input_text)
uni_score, extracted_output = self.calculate_score(expected_output, output)
uni_score, extracted_output = self.calculate_score(expected_output, output)
if uni_score == 0:
self.log_mismatch(input_text, expected_output, output, extracted_output)
if uni_score == 0:
self.log_mismatch(input_text, expected_output, output, extracted_output)
return input_text, output, expected_output, uni_score, cost
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
return input_text, output, expected_output, uni_score, cost
def get_result_columns(self) -> List[str]:
return ["question", "prediction", "expected_output", "score", "cost"]

View file

@ -5,6 +5,7 @@ import asyncio
import threading
from datetime import datetime
from typing import List, Tuple, Callable, Any, Optional, Dict
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
from metagpt.actions.code_sanitize import sanitize
from examples.aflow.benchmark.benchmark import BaseBenchmark
@ -83,37 +84,40 @@ class MBPPBenchmark(BaseBenchmark):
return result
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
max_retries = 5
retries = 0
@retry(
stop=stop_after_attempt(5),
wait=wait_fixed(1),
retry=retry_if_exception_type(Exception),
reraise=True
)
async def _generate_output(self, graph, prompt, entry_point):
return await graph(prompt, entry_point)
async def evaluate_problem(self, data: dict, graph: Callable) -> Tuple[str, str, str, float, float]:
input_text = data["prompt"]
expected_output = "\nCorrect Solution:\ndef " + data["code"]
while retries < max_retries:
try:
prediction, cost = await graph(data["prompt"], data["entry_point"])
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:" + data["code"]
score = 1.0 if ret[0] == self.PASS else 0.0
try:
# Generate prediction using the graph function
prediction, cost = await self._generate_output(graph, input_text, data["entry_point"])
# Check the solution
ret = self.check_solution(prediction, data["test"], data["entry_point"])
test_case_details = ret[1]
expected_output = test_case_details + "\nCorrect Solution:" + data["code"]
# Calculate score based on the check result
score = 1.0 if ret[0] == self.PASS else 0.0
if score == 0:
self.log_mismatch(data["prompt"], expected_output, prediction, score)
break
# Log mismatch if the score is 0
if score == 0:
self.log_mismatch(input_text, expected_output, prediction, score)
except Exception as e:
retries += 1
print(f"Error generating prediction: {e}. Retrying... ({retries}/{max_retries})")
return input_text, prediction, expected_output, score, cost
if retries == max_retries:
print("Maximum retries reached. Skipping this sample.")
prediction = None
ret = (self.FAIL, [])
score = 0.0
cost = 0.0
break
return data["prompt"], prediction, expected_output, score, cost
except Exception as e:
print(f"Maximum retries reached. Skipping this sample. Error: {e}")
return input_text, str(e), expected_output, 0.0, 0.0
def calculate_score(self, expected_output: str, prediction: str) -> Tuple[float, str]:
# The scoring logic for MBPP is already implemented in evaluate_problem, this is just to conform to the interface

View file

@ -10,12 +10,6 @@ from typing import Optional
from metagpt.configs.llm_config import LLMConfig
from metagpt.context import Context
from metagpt.provider.base_llm import BaseLLM
from metagpt.utils.cost_manager import CostManager
global cost_manager
if not globals().get("cost_manager"):
cost_manager = CostManager()
def LLM(llm_config: Optional[LLMConfig] = None, context: Context = None) -> BaseLLM:
@ -23,6 +17,4 @@ def LLM(llm_config: Optional[LLMConfig] = None, context: Context = None) -> Base
ctx = context or Context()
if llm_config is not None:
return ctx.llm_with_cost_manager_from_llm_config(llm_config)
llm = ctx.llm()
llm.cost_manager = cost_manager
return llm
return ctx.llm()