mirror of
https://github.com/FoundationAgents/MetaGPT.git
synced 2026-06-11 15:15:18 +02:00
Add SPO base code
This commit is contained in:
parent
4954729e75
commit
da1e103372
9 changed files with 574 additions and 0 deletions
20
metagpt/ext/spo/prompts/evaluate_prompt.py
Normal file
20
metagpt/ext/spo/prompts/evaluate_prompt.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
EVALUATE_PROMPT = """
|
||||
Based on the original requirements, evaluate the two responses, A and B, and determine which one better meets the requirements. If a reference answer is provided, strictly follow the format/content of the reference answer.
|
||||
|
||||
# Requirement
|
||||
{requirement}
|
||||
|
||||
# A
|
||||
{sample}
|
||||
|
||||
# B
|
||||
{new_sample}
|
||||
|
||||
# Golden answer
|
||||
{answers}
|
||||
|
||||
Provide your analysis and the choice you believe is better, using XML tags to encapsulate your response.
|
||||
|
||||
<analyse>Some analysis</analyse>
|
||||
<choose>A/B (the better answer in your opinion)</choose>
|
||||
"""
|
||||
32
metagpt/ext/spo/prompts/optimize_prompt.py
Normal file
32
metagpt/ext/spo/prompts/optimize_prompt.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
PROMPT_OPTIMIZE_PROMPT = """
|
||||
You are building a prompt to address user requirement.Based on the given prompt,
|
||||
please reconstruct and optimize it. You can add, modify, or delete prompts. Please include a single modification in
|
||||
XML tags in your reply. During the optimization, you can incorporate any thinking models.
|
||||
This is a prompt that performed excellently in a previous iteration. You must make further optimizations and improvements based on this prompt. The modified prompt must differ from the provided example.
|
||||
|
||||
requirements:
|
||||
```
|
||||
{requirements}
|
||||
```
|
||||
|
||||
reference prompt:
|
||||
```
|
||||
{prompt}
|
||||
```
|
||||
|
||||
The execution result of this reference prompt is(some cases):
|
||||
```
|
||||
{answers}
|
||||
```
|
||||
|
||||
The best answer we expect(some cases):
|
||||
```
|
||||
{golden_answers}
|
||||
```
|
||||
|
||||
Provide your analysis, optimization points, and the complete optimized prompt using the following XML format:
|
||||
|
||||
<analyse>Analyze what drawbacks exist in the results produced by the reference prompt and how to improve them.</analyse>
|
||||
<modification>Summarize the key points for improvement in one sentence</modification>
|
||||
<prompt>Provide the complete optimized prompt {count}</prompt>
|
||||
"""
|
||||
85
metagpt/ext/spo/scripts/evaluator.py
Normal file
85
metagpt/ext/spo/scripts/evaluator.py
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 8/23/2024 10:00 AM
|
||||
# @Author : all
|
||||
# @Desc : Evaluation for different datasets
|
||||
import asyncio
|
||||
from typing import Dict, Literal, Tuple, List, Any
|
||||
|
||||
from utils import load
|
||||
from utils.llm_client import responser, extract_content
|
||||
from prompt.evaluate_prompt import EVALUATE_PROMPT
|
||||
import random
|
||||
|
||||
|
||||
class QuickExecute:
|
||||
"""
|
||||
完成不同数据集的评估。
|
||||
"""
|
||||
|
||||
def __init__(self, prompt: str, k: int = 3, model=None):
|
||||
|
||||
self.prompt = prompt
|
||||
self.k = k
|
||||
self.model = model
|
||||
|
||||
async def prompt_execute(self) -> tuple[Any]:
|
||||
_, _, qa, _ = load.load_meta_data(k=self.k)
|
||||
answers = []
|
||||
|
||||
async def fetch_answer(q: str) -> Dict[str, Any]:
|
||||
messages = [{"role": "user", "content": f"{self.prompt}\n\n{q}"}]
|
||||
try:
|
||||
answer = await responser(messages, model=self.model['name'], temperature=self.model['temperature'])
|
||||
return {'question': q, 'answer': answer.content}
|
||||
except Exception as e:
|
||||
return {'question': q, 'answer': str(e)}
|
||||
|
||||
tasks = [fetch_answer(item['question']) for item in qa]
|
||||
answers = await asyncio.gather(*tasks)
|
||||
|
||||
return answers
|
||||
|
||||
|
||||
class QuickEvaluate:
|
||||
"""
|
||||
Complete the evaluation for different datasets here.
|
||||
"""
|
||||
|
||||
def __init__(self, k: int = 3):
|
||||
self.k = k
|
||||
|
||||
async def prompt_evaluate(self, sample: list, new_sample: list, model: dict) -> bool:
|
||||
_, requirement, qa, _ = load.load_meta_data(k=self.k)
|
||||
|
||||
if random.random() < 0.5:
|
||||
sample, new_sample = new_sample, sample
|
||||
is_swapped = True
|
||||
else:
|
||||
is_swapped = False
|
||||
|
||||
messages = [{"role": "user", "content": EVALUATE_PROMPT.format(
|
||||
requirement=requirement,
|
||||
sample=sample,
|
||||
new_sample=new_sample,
|
||||
answers=str(qa))}]
|
||||
|
||||
try:
|
||||
response = await responser(messages, model=model['name'], temperature=model['temperature'])
|
||||
choose = extract_content(response.content, 'choose')
|
||||
|
||||
if is_swapped:
|
||||
return choose == "A"
|
||||
return choose == "B"
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
execute = QuickExecute(prompt="Answer the Question,{question}", k=3)
|
||||
|
||||
# 使用asyncio.run来运行异步方法
|
||||
answers = asyncio.run(execute.prompt_evaluate())
|
||||
print(answers)
|
||||
161
metagpt/ext/spo/scripts/optimizer.py
Normal file
161
metagpt/ext/spo/scripts/optimizer.py
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# @Date : 8/12/2024 22:00 PM
|
||||
# @Author : issac
|
||||
# @Desc : optimizer for prompt
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from optimizer_utils.data_utils import DataUtils
|
||||
from optimizer_utils.evaluation_utils import EvaluationUtils
|
||||
from optimizer_utils.prompt_utils import PromptUtils
|
||||
from prompt.optimize_prompt import PROMPT_OPTIMIZE_PROMPT
|
||||
from utils import load
|
||||
from utils.logs import logger
|
||||
from utils.llm_client import responser, extract_content
|
||||
from utils.token_manager import get_token_tracker
|
||||
|
||||
|
||||
class Optimizer:
|
||||
def __init__(
|
||||
self,
|
||||
optimized_path: str = None,
|
||||
initial_round: int = 1,
|
||||
max_rounds: int = 10,
|
||||
name: str = "test",
|
||||
template: str = "meta.yaml",
|
||||
execute_model=None,
|
||||
optimize_model=None,
|
||||
evaluate_model=None,
|
||||
iteration: bool = True,
|
||||
) -> None:
|
||||
|
||||
self.dataset = name
|
||||
self.root_path = f"{optimized_path}/{self.dataset}"
|
||||
self.top_scores = []
|
||||
self.round = initial_round
|
||||
self.max_rounds = max_rounds
|
||||
self.execute_model = execute_model
|
||||
self.optimize_model = optimize_model
|
||||
self.evaluate_model = evaluate_model
|
||||
self.iteration = iteration
|
||||
self.template = template
|
||||
|
||||
self.prompt_utils = PromptUtils(self.root_path)
|
||||
self.data_utils = DataUtils(self.root_path)
|
||||
self.evaluation_utils = EvaluationUtils(self.root_path)
|
||||
self.token_tracker = get_token_tracker()
|
||||
|
||||
def optimize(self):
|
||||
if self.iteration is True:
|
||||
|
||||
for opt_round in range(self.max_rounds):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
score = loop.run_until_complete(self._optimize_prompt())
|
||||
self.round += 1
|
||||
logger.info(f"Score for round {self.round}: {score}")
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
else:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
score = loop.run_until_complete(self._test_prompt())
|
||||
logger.info(f"Score for round {self.round}: {score}")
|
||||
|
||||
async def _optimize_prompt(self):
|
||||
|
||||
prompt_path = f"{self.root_path}/prompts"
|
||||
load.set_file_name(self.template)
|
||||
|
||||
data = self.data_utils.load_results(prompt_path)
|
||||
|
||||
if self.round == 1:
|
||||
directory = self.prompt_utils.create_round_directory(prompt_path, self.round)
|
||||
# Load prompt using prompt_utils
|
||||
|
||||
prompt, _, _, _ = load.load_meta_data()
|
||||
self.prompt = prompt
|
||||
self.prompt_utils.write_prompt(directory, prompt=self.prompt)
|
||||
new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
|
||||
initial=True)
|
||||
_, answers = await self.evaluation_utils.evaluate_prompt(self, None, new_sample, model=self.evaluate_model,
|
||||
path=prompt_path, data=data, initial=True)
|
||||
self.prompt_utils.write_answers(directory, answers=answers)
|
||||
|
||||
|
||||
_, requirements, qa, count = load.load_meta_data(3)
|
||||
|
||||
directory = self.prompt_utils.create_round_directory(prompt_path, self.round + 1)
|
||||
|
||||
top_round = self.data_utils.get_best_round()
|
||||
|
||||
sample = top_round
|
||||
|
||||
logger.info(f"choose {sample['round']}")
|
||||
|
||||
prompt = sample['prompt']
|
||||
|
||||
golden_answer = self.data_utils.list_to_markdown(qa)
|
||||
best_answer = self.data_utils.list_to_markdown(sample["answers"])
|
||||
|
||||
optimize_prompt = PROMPT_OPTIMIZE_PROMPT.format(
|
||||
prompt=sample["prompt"], answers=best_answer,
|
||||
requirements=requirements,
|
||||
golden_answers=golden_answer,
|
||||
count=count)
|
||||
|
||||
response = await responser(messages=[{"role": "user", "content": optimize_prompt}],
|
||||
model=self.optimize_model['name'], temperature=self.optimize_model['temperature'])
|
||||
|
||||
modification = extract_content(response.content, "modification")
|
||||
prompt = extract_content(response.content, "prompt")
|
||||
if prompt:
|
||||
self.prompt = prompt
|
||||
else:
|
||||
self.prompt = ""
|
||||
|
||||
logger.info(directory)
|
||||
|
||||
self.prompt_utils.write_prompt(directory, prompt=self.prompt)
|
||||
|
||||
new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
|
||||
initial=False)
|
||||
|
||||
success, answers = await self.evaluation_utils.evaluate_prompt(self, sample, new_sample,
|
||||
model=self.evaluate_model, path=prompt_path,
|
||||
data=data, initial=False)
|
||||
|
||||
self.prompt_utils.write_answers(directory, answers=answers)
|
||||
|
||||
logger.info(prompt)
|
||||
logger.info(success)
|
||||
|
||||
logger.info(f"now is {self.round + 1}")
|
||||
|
||||
self.token_tracker.print_usage_report()
|
||||
usage = self.token_tracker.get_total_usage()
|
||||
|
||||
self.data_utils.save_cost(directory, usage)
|
||||
|
||||
return prompt
|
||||
|
||||
async def _test_prompt(self):
|
||||
|
||||
load.set_file_name(self.template)
|
||||
|
||||
prompt_path = f"{self.root_path}/prompts"
|
||||
data = self.data_utils.load_results(prompt_path)
|
||||
|
||||
directory = self.prompt_utils.create_round_directory(prompt_path, self.round)
|
||||
# Load prompt using prompt_utils
|
||||
|
||||
new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
|
||||
initial=False, k=100)
|
||||
self.prompt_utils.write_answers(directory, answers=new_sample["answers"], name="test_answers.txt")
|
||||
|
||||
logger.info(new_sample)
|
||||
|
||||
logger.info(self.round)
|
||||
|
||||
return None
|
||||
96
metagpt/ext/spo/scripts/utils/data_utils.py
Normal file
96
metagpt/ext/spo/scripts/utils/data_utils.py
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
import datetime
|
||||
import json
|
||||
import os
|
||||
from typing import Union, List, Dict
|
||||
|
||||
import pandas as pd
|
||||
|
||||
|
||||
class DataUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
self.top_scores = []
|
||||
|
||||
def load_results(self, path: str) -> list:
|
||||
result_path = os.path.join(path, "results.json")
|
||||
if os.path.exists(result_path):
|
||||
with open(result_path, "r") as json_file:
|
||||
try:
|
||||
return json.load(json_file)
|
||||
except json.JSONDecodeError:
|
||||
return []
|
||||
return []
|
||||
|
||||
def get_best_round(self):
|
||||
|
||||
top_rounds = self._load_scores()
|
||||
|
||||
for entry in self.top_scores:
|
||||
if entry["succeed"]:
|
||||
return entry
|
||||
|
||||
return None
|
||||
|
||||
def get_results_file_path(self, prompt_path: str) -> str:
|
||||
return os.path.join(prompt_path, "results.json")
|
||||
|
||||
def create_result_data(self, round: int, answers: list[dict], prompt: str, succeed: bool, tokens: int) -> dict:
|
||||
now = datetime.datetime.now()
|
||||
return {"round": round, "answers": answers, "prompt": prompt, "succeed": succeed, "tokens": tokens, "time": now}
|
||||
|
||||
def save_results(self, json_file_path: str, data: Union[List, Dict]):
|
||||
with open(json_file_path, "w") as json_file:
|
||||
json.dump(data, json_file, default=str, indent=4)
|
||||
|
||||
def save_cost(self, directory: str, data: Union[List, Dict]):
|
||||
json_file = os.path.join(directory, 'cost.json')
|
||||
with open(json_file, "w", encoding="utf-8") as file:
|
||||
json.dump(data, file, default=str, indent=4)
|
||||
|
||||
def _load_scores(self):
|
||||
|
||||
rounds_dir = os.path.join(self.root_path, "prompts")
|
||||
|
||||
result_file = os.path.join(rounds_dir, "results.json")
|
||||
self.top_scores = []
|
||||
|
||||
with open(result_file, "r", encoding="utf-8") as file:
|
||||
data = json.load(file)
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
for index, row in df.iterrows():
|
||||
self.top_scores.append(
|
||||
{"round": row["round"], "succeed": row["succeed"], "prompt": row["prompt"], "answers": row['answers']})
|
||||
|
||||
self.top_scores.sort(key=lambda x: x["round"], reverse=True)
|
||||
|
||||
return self.top_scores
|
||||
|
||||
def list_to_markdown(self, questions_list):
|
||||
"""
|
||||
Convert a list of question-answer dictionaries to a formatted Markdown string.
|
||||
|
||||
Args:
|
||||
questions_list (list): List of dictionaries containing 'question' and 'answer' keys
|
||||
|
||||
Returns:
|
||||
str: Formatted Markdown string
|
||||
"""
|
||||
markdown_text = "```\n"
|
||||
|
||||
for i, qa_pair in enumerate(questions_list, 1):
|
||||
# Add question section
|
||||
markdown_text += f"Question {i}\n\n"
|
||||
markdown_text += f"{qa_pair['question']}\n\n"
|
||||
|
||||
# Add answer section
|
||||
markdown_text += f"Answer {i}\n\n"
|
||||
markdown_text += f"{qa_pair['answer']}\n\n"
|
||||
|
||||
# Add separator between QA pairs except for the last one
|
||||
if i < len(questions_list):
|
||||
markdown_text += "---\n\n"
|
||||
|
||||
markdown_text += "\n```"
|
||||
|
||||
return markdown_text
|
||||
63
metagpt/ext/spo/scripts/utils/evaluation_utils.py
Normal file
63
metagpt/ext/spo/scripts/utils/evaluation_utils.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
import asyncio
|
||||
|
||||
from script.evaluator import QuickEvaluate, QuickExecute
|
||||
from utils.logs import logger
|
||||
import tiktoken
|
||||
|
||||
|
||||
def count_tokens(sample):
|
||||
if sample is None:
|
||||
return 0
|
||||
else:
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
return len(encoding.encode(str(sample['answers'])))
|
||||
|
||||
class EvaluationUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
|
||||
async def execute_prompt(self, optimizer, prompt_path, data, model, initial=False, k=3):
|
||||
|
||||
optimizer.prompt = optimizer.prompt_utils.load_prompt(optimizer.round, prompt_path)
|
||||
evaluator = QuickExecute(prompt=optimizer.prompt, k=k, model=model)
|
||||
|
||||
answers = await evaluator.prompt_execute()
|
||||
|
||||
cur_round = optimizer.round + 1 if not initial else optimizer.round
|
||||
|
||||
new_data = {"round": cur_round, "answers": answers, "prompt": optimizer.prompt}
|
||||
|
||||
return new_data
|
||||
|
||||
async def evaluate_prompt(self, optimizer, sample, new_sample, path, data, model, initial=False):
|
||||
|
||||
evaluator = QuickEvaluate(k=3)
|
||||
original_token = count_tokens(sample)
|
||||
new_token = count_tokens(new_sample)
|
||||
|
||||
if initial is True:
|
||||
succeed = True
|
||||
else:
|
||||
evaluation_results = []
|
||||
for _ in range(4):
|
||||
result = await evaluator.prompt_evaluate(sample=sample, new_sample=new_sample, model=model)
|
||||
evaluation_results.append(result)
|
||||
|
||||
logger.info(evaluation_results)
|
||||
|
||||
true_count = evaluation_results.count(True)
|
||||
false_count = evaluation_results.count(False)
|
||||
succeed = true_count > false_count
|
||||
|
||||
new_data = optimizer.data_utils.create_result_data(new_sample['round'], new_sample['answers'],
|
||||
new_sample['prompt'], succeed, new_token)
|
||||
|
||||
data.append(new_data)
|
||||
|
||||
result_path = optimizer.data_utils.get_results_file_path(path)
|
||||
|
||||
optimizer.data_utils.save_results(result_path, data)
|
||||
|
||||
answers = new_sample['answers']
|
||||
|
||||
return succeed, answers
|
||||
51
metagpt/ext/spo/scripts/utils/load.py
Normal file
51
metagpt/ext/spo/scripts/utils/load.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
import yaml
|
||||
import random
|
||||
import os
|
||||
|
||||
FILE_NAME = 'meta.yaml' # 默认值
|
||||
|
||||
|
||||
def load_llm():
|
||||
# 读取上一级目录中的 YAML 配置文件
|
||||
config_path = os.path.join(os.path.dirname(__file__), '..', 'config.yaml')
|
||||
with open(config_path, 'r') as file:
|
||||
config = yaml.safe_load(file)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def set_file_name(name):
|
||||
global FILE_NAME
|
||||
FILE_NAME = name
|
||||
|
||||
|
||||
def load_meta_data(k=5):
|
||||
|
||||
k = 5
|
||||
# 读取 YAML 文件
|
||||
config_path = os.path.join(os.path.dirname(__file__), '../settings', FILE_NAME)
|
||||
with open(config_path, 'r', encoding='utf-8') as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
qa = []
|
||||
|
||||
# 提取问题和答案
|
||||
for item in data['faq']:
|
||||
question = item['question']
|
||||
answer = item['answer']
|
||||
qa.append({'question': question, 'answer': answer})
|
||||
|
||||
prompt = data['prompt']
|
||||
requirements = data['requirements']
|
||||
count = data['count']
|
||||
|
||||
if isinstance(count, int):
|
||||
count = f", within {count} words"
|
||||
else:
|
||||
count = ""
|
||||
|
||||
# 随机选择三组问答
|
||||
random_qa = random.sample(qa, min(k, len(qa))) # 确保不超过列表长度
|
||||
|
||||
return prompt, requirements, random_qa, count
|
||||
|
||||
43
metagpt/ext/spo/scripts/utils/prompt_utils.py
Normal file
43
metagpt/ext/spo/scripts/utils/prompt_utils.py
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
import json
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import traceback
|
||||
from typing import List
|
||||
from utils.logs import logger
|
||||
|
||||
|
||||
class PromptUtils:
|
||||
def __init__(self, root_path: str):
|
||||
self.root_path = root_path
|
||||
|
||||
def create_round_directory(self, prompt_path: str, round_number: int) -> str:
|
||||
directory = os.path.join(prompt_path, f"round_{round_number}")
|
||||
os.makedirs(directory, exist_ok=True)
|
||||
return directory
|
||||
|
||||
def load_prompt(self, round_number: int, prompts_path: str):
|
||||
prompt_file_name = f"{prompts_path}/prompt.txt"
|
||||
|
||||
try:
|
||||
with open(prompt_file_name, 'r', encoding='utf-8') as file:
|
||||
return file.read()
|
||||
except FileNotFoundError as e:
|
||||
logger.info(f"Error loading prompt for round {round_number}: {e}")
|
||||
raise
|
||||
|
||||
def write_answers(self, directory: str, answers: dict, name: str = "answers.txt"):
|
||||
|
||||
with open(os.path.join(directory, name), "w", encoding="utf-8") as file:
|
||||
for item in answers:
|
||||
file.write(f"Question:\n{item['question']}\n")
|
||||
file.write(f"Answer:\n{item['answer']}\n")
|
||||
file.write("\n")
|
||||
|
||||
def write_prompt(self, directory: str, prompt: str):
|
||||
|
||||
with open(os.path.join(directory, "prompt.txt"), "w", encoding="utf-8") as file:
|
||||
file.write(prompt)
|
||||
with open(os.path.join(directory, "__init__.py"), "w", encoding="utf-8") as file:
|
||||
file.write("")
|
||||
|
||||
23
metagpt/ext/spo/settings/Poem.yaml
Normal file
23
metagpt/ext/spo/settings/Poem.yaml
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
prompt: |
|
||||
Create poetry in the requested style and format.
|
||||
|
||||
requirements: |
|
||||
None
|
||||
|
||||
count: None
|
||||
|
||||
faq:
|
||||
- question: |
|
||||
Write a modern sonnet about climate change
|
||||
answer: |
|
||||
None
|
||||
|
||||
- question: |
|
||||
Create a haiku series about New York City
|
||||
answer: |
|
||||
None
|
||||
|
||||
- question: |
|
||||
Write a free verse poem about social media
|
||||
answer: |
|
||||
None
|
||||
Loading…
Add table
Add a link
Reference in a new issue