diff --git a/metagpt/ext/spo/prompts/evaluate_prompt.py b/metagpt/ext/spo/prompts/evaluate_prompt.py
new file mode 100644
index 000000000..80a9b093b
--- /dev/null
+++ b/metagpt/ext/spo/prompts/evaluate_prompt.py
@@ -0,0 +1,20 @@
+EVALUATE_PROMPT = """
+Based on the original requirements, evaluate the two responses, A and B, and determine which one better meets the requirements. If a reference answer is provided, strictly follow the format/content of the reference answer.
+
+# Requirement
+{requirement}
+
+# A
+{sample}
+
+# B
+{new_sample}
+
+# Golden answer
+{answers}
+
+Provide your analysis and the choice you believe is better, using XML tags to encapsulate your response.
+
+Some analysis
+A/B (the better answer in your opinion)
+"""
diff --git a/metagpt/ext/spo/prompts/optimize_prompt.py b/metagpt/ext/spo/prompts/optimize_prompt.py
new file mode 100644
index 000000000..09e20acbc
--- /dev/null
+++ b/metagpt/ext/spo/prompts/optimize_prompt.py
@@ -0,0 +1,32 @@
+PROMPT_OPTIMIZE_PROMPT = """
+You are building a prompt to address user requirement.Based on the given prompt,
+please reconstruct and optimize it. You can add, modify, or delete prompts. Please include a single modification in
+XML tags in your reply. During the optimization, you can incorporate any thinking models.
+This is a prompt that performed excellently in a previous iteration. You must make further optimizations and improvements based on this prompt. The modified prompt must differ from the provided example.
+
+requirements:
+```
+{requirements}
+```
+
+reference prompt:
+```
+{prompt}
+```
+
+The execution result of this reference prompt is(some cases):
+```
+{answers}
+```
+
+The best answer we expect(some cases):
+```
+{golden_answers}
+```
+
+Provide your analysis, optimization points, and the complete optimized prompt using the following XML format:
+
+Analyze what drawbacks exist in the results produced by the reference prompt and how to improve them.
+Summarize the key points for improvement in one sentence
+Provide the complete optimized prompt {count}
+"""
diff --git a/metagpt/ext/spo/scripts/evaluator.py b/metagpt/ext/spo/scripts/evaluator.py
new file mode 100644
index 000000000..c6f63c04b
--- /dev/null
+++ b/metagpt/ext/spo/scripts/evaluator.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+# @Date : 8/23/2024 10:00 AM
+# @Author : all
+# @Desc : Evaluation for different datasets
+import asyncio
+from typing import Dict, Literal, Tuple, List, Any
+
+from utils import load
+from utils.llm_client import responser, extract_content
+from prompt.evaluate_prompt import EVALUATE_PROMPT
+import random
+
+
+class QuickExecute:
+ """
+ 完成不同数据集的评估。
+ """
+
+ def __init__(self, prompt: str, k: int = 3, model=None):
+
+ self.prompt = prompt
+ self.k = k
+ self.model = model
+
+ async def prompt_execute(self) -> tuple[Any]:
+ _, _, qa, _ = load.load_meta_data(k=self.k)
+ answers = []
+
+ async def fetch_answer(q: str) -> Dict[str, Any]:
+ messages = [{"role": "user", "content": f"{self.prompt}\n\n{q}"}]
+ try:
+ answer = await responser(messages, model=self.model['name'], temperature=self.model['temperature'])
+ return {'question': q, 'answer': answer.content}
+ except Exception as e:
+ return {'question': q, 'answer': str(e)}
+
+ tasks = [fetch_answer(item['question']) for item in qa]
+ answers = await asyncio.gather(*tasks)
+
+ return answers
+
+
+class QuickEvaluate:
+ """
+ Complete the evaluation for different datasets here.
+ """
+
+ def __init__(self, k: int = 3):
+ self.k = k
+
+ async def prompt_evaluate(self, sample: list, new_sample: list, model: dict) -> bool:
+ _, requirement, qa, _ = load.load_meta_data(k=self.k)
+
+ if random.random() < 0.5:
+ sample, new_sample = new_sample, sample
+ is_swapped = True
+ else:
+ is_swapped = False
+
+ messages = [{"role": "user", "content": EVALUATE_PROMPT.format(
+ requirement=requirement,
+ sample=sample,
+ new_sample=new_sample,
+ answers=str(qa))}]
+
+ try:
+ response = await responser(messages, model=model['name'], temperature=model['temperature'])
+ choose = extract_content(response.content, 'choose')
+
+ if is_swapped:
+ return choose == "A"
+ return choose == "B"
+
+ except Exception as e:
+ print(e)
+ return False
+
+
+
+if __name__ == "__main__":
+ execute = QuickExecute(prompt="Answer the Question,{question}", k=3)
+
+ # 使用asyncio.run来运行异步方法
+ answers = asyncio.run(execute.prompt_evaluate())
+ print(answers)
diff --git a/metagpt/ext/spo/scripts/optimizer.py b/metagpt/ext/spo/scripts/optimizer.py
new file mode 100644
index 000000000..1363cbd23
--- /dev/null
+++ b/metagpt/ext/spo/scripts/optimizer.py
@@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+# @Date : 8/12/2024 22:00 PM
+# @Author : issac
+# @Desc : optimizer for prompt
+
+import asyncio
+import time
+from optimizer_utils.data_utils import DataUtils
+from optimizer_utils.evaluation_utils import EvaluationUtils
+from optimizer_utils.prompt_utils import PromptUtils
+from prompt.optimize_prompt import PROMPT_OPTIMIZE_PROMPT
+from utils import load
+from utils.logs import logger
+from utils.llm_client import responser, extract_content
+from utils.token_manager import get_token_tracker
+
+
+class Optimizer:
+ def __init__(
+ self,
+ optimized_path: str = None,
+ initial_round: int = 1,
+ max_rounds: int = 10,
+ name: str = "test",
+ template: str = "meta.yaml",
+ execute_model=None,
+ optimize_model=None,
+ evaluate_model=None,
+ iteration: bool = True,
+ ) -> None:
+
+ self.dataset = name
+ self.root_path = f"{optimized_path}/{self.dataset}"
+ self.top_scores = []
+ self.round = initial_round
+ self.max_rounds = max_rounds
+ self.execute_model = execute_model
+ self.optimize_model = optimize_model
+ self.evaluate_model = evaluate_model
+ self.iteration = iteration
+ self.template = template
+
+ self.prompt_utils = PromptUtils(self.root_path)
+ self.data_utils = DataUtils(self.root_path)
+ self.evaluation_utils = EvaluationUtils(self.root_path)
+ self.token_tracker = get_token_tracker()
+
+ def optimize(self):
+ if self.iteration is True:
+
+ for opt_round in range(self.max_rounds):
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ score = loop.run_until_complete(self._optimize_prompt())
+ self.round += 1
+ logger.info(f"Score for round {self.round}: {score}")
+
+ time.sleep(5)
+
+ else:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ score = loop.run_until_complete(self._test_prompt())
+ logger.info(f"Score for round {self.round}: {score}")
+
+ async def _optimize_prompt(self):
+
+ prompt_path = f"{self.root_path}/prompts"
+ load.set_file_name(self.template)
+
+ data = self.data_utils.load_results(prompt_path)
+
+ if self.round == 1:
+ directory = self.prompt_utils.create_round_directory(prompt_path, self.round)
+ # Load prompt using prompt_utils
+
+ prompt, _, _, _ = load.load_meta_data()
+ self.prompt = prompt
+ self.prompt_utils.write_prompt(directory, prompt=self.prompt)
+ new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
+ initial=True)
+ _, answers = await self.evaluation_utils.evaluate_prompt(self, None, new_sample, model=self.evaluate_model,
+ path=prompt_path, data=data, initial=True)
+ self.prompt_utils.write_answers(directory, answers=answers)
+
+
+ _, requirements, qa, count = load.load_meta_data(3)
+
+ directory = self.prompt_utils.create_round_directory(prompt_path, self.round + 1)
+
+ top_round = self.data_utils.get_best_round()
+
+ sample = top_round
+
+ logger.info(f"choose {sample['round']}")
+
+ prompt = sample['prompt']
+
+ golden_answer = self.data_utils.list_to_markdown(qa)
+ best_answer = self.data_utils.list_to_markdown(sample["answers"])
+
+ optimize_prompt = PROMPT_OPTIMIZE_PROMPT.format(
+ prompt=sample["prompt"], answers=best_answer,
+ requirements=requirements,
+ golden_answers=golden_answer,
+ count=count)
+
+ response = await responser(messages=[{"role": "user", "content": optimize_prompt}],
+ model=self.optimize_model['name'], temperature=self.optimize_model['temperature'])
+
+ modification = extract_content(response.content, "modification")
+ prompt = extract_content(response.content, "prompt")
+ if prompt:
+ self.prompt = prompt
+ else:
+ self.prompt = ""
+
+ logger.info(directory)
+
+ self.prompt_utils.write_prompt(directory, prompt=self.prompt)
+
+ new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
+ initial=False)
+
+ success, answers = await self.evaluation_utils.evaluate_prompt(self, sample, new_sample,
+ model=self.evaluate_model, path=prompt_path,
+ data=data, initial=False)
+
+ self.prompt_utils.write_answers(directory, answers=answers)
+
+ logger.info(prompt)
+ logger.info(success)
+
+ logger.info(f"now is {self.round + 1}")
+
+ self.token_tracker.print_usage_report()
+ usage = self.token_tracker.get_total_usage()
+
+ self.data_utils.save_cost(directory, usage)
+
+ return prompt
+
+ async def _test_prompt(self):
+
+ load.set_file_name(self.template)
+
+ prompt_path = f"{self.root_path}/prompts"
+ data = self.data_utils.load_results(prompt_path)
+
+ directory = self.prompt_utils.create_round_directory(prompt_path, self.round)
+ # Load prompt using prompt_utils
+
+ new_sample = await self.evaluation_utils.execute_prompt(self, directory, data, model=self.execute_model,
+ initial=False, k=100)
+ self.prompt_utils.write_answers(directory, answers=new_sample["answers"], name="test_answers.txt")
+
+ logger.info(new_sample)
+
+ logger.info(self.round)
+
+ return None
diff --git a/metagpt/ext/spo/scripts/utils/data_utils.py b/metagpt/ext/spo/scripts/utils/data_utils.py
new file mode 100644
index 000000000..26fb515d7
--- /dev/null
+++ b/metagpt/ext/spo/scripts/utils/data_utils.py
@@ -0,0 +1,96 @@
+import datetime
+import json
+import os
+from typing import Union, List, Dict
+
+import pandas as pd
+
+
+class DataUtils:
+ def __init__(self, root_path: str):
+ self.root_path = root_path
+ self.top_scores = []
+
+ def load_results(self, path: str) -> list:
+ result_path = os.path.join(path, "results.json")
+ if os.path.exists(result_path):
+ with open(result_path, "r") as json_file:
+ try:
+ return json.load(json_file)
+ except json.JSONDecodeError:
+ return []
+ return []
+
+ def get_best_round(self):
+
+ top_rounds = self._load_scores()
+
+ for entry in self.top_scores:
+ if entry["succeed"]:
+ return entry
+
+ return None
+
+ def get_results_file_path(self, prompt_path: str) -> str:
+ return os.path.join(prompt_path, "results.json")
+
+ def create_result_data(self, round: int, answers: list[dict], prompt: str, succeed: bool, tokens: int) -> dict:
+ now = datetime.datetime.now()
+ return {"round": round, "answers": answers, "prompt": prompt, "succeed": succeed, "tokens": tokens, "time": now}
+
+ def save_results(self, json_file_path: str, data: Union[List, Dict]):
+ with open(json_file_path, "w") as json_file:
+ json.dump(data, json_file, default=str, indent=4)
+
+ def save_cost(self, directory: str, data: Union[List, Dict]):
+ json_file = os.path.join(directory, 'cost.json')
+ with open(json_file, "w", encoding="utf-8") as file:
+ json.dump(data, file, default=str, indent=4)
+
+ def _load_scores(self):
+
+ rounds_dir = os.path.join(self.root_path, "prompts")
+
+ result_file = os.path.join(rounds_dir, "results.json")
+ self.top_scores = []
+
+ with open(result_file, "r", encoding="utf-8") as file:
+ data = json.load(file)
+ df = pd.DataFrame(data)
+
+ for index, row in df.iterrows():
+ self.top_scores.append(
+ {"round": row["round"], "succeed": row["succeed"], "prompt": row["prompt"], "answers": row['answers']})
+
+ self.top_scores.sort(key=lambda x: x["round"], reverse=True)
+
+ return self.top_scores
+
+ def list_to_markdown(self, questions_list):
+ """
+ Convert a list of question-answer dictionaries to a formatted Markdown string.
+
+ Args:
+ questions_list (list): List of dictionaries containing 'question' and 'answer' keys
+
+ Returns:
+ str: Formatted Markdown string
+ """
+ markdown_text = "```\n"
+
+ for i, qa_pair in enumerate(questions_list, 1):
+ # Add question section
+ markdown_text += f"Question {i}\n\n"
+ markdown_text += f"{qa_pair['question']}\n\n"
+
+ # Add answer section
+ markdown_text += f"Answer {i}\n\n"
+ markdown_text += f"{qa_pair['answer']}\n\n"
+
+ # Add separator between QA pairs except for the last one
+ if i < len(questions_list):
+ markdown_text += "---\n\n"
+
+ markdown_text += "\n```"
+
+ return markdown_text
diff --git a/metagpt/ext/spo/scripts/utils/evaluation_utils.py b/metagpt/ext/spo/scripts/utils/evaluation_utils.py
new file mode 100644
index 000000000..42c4395c6
--- /dev/null
+++ b/metagpt/ext/spo/scripts/utils/evaluation_utils.py
@@ -0,0 +1,63 @@
+import asyncio
+
+from script.evaluator import QuickEvaluate, QuickExecute
+from utils.logs import logger
+import tiktoken
+
+
+def count_tokens(sample):
+ if sample is None:
+ return 0
+ else:
+ encoding = tiktoken.get_encoding("cl100k_base")
+ return len(encoding.encode(str(sample['answers'])))
+
+class EvaluationUtils:
+ def __init__(self, root_path: str):
+ self.root_path = root_path
+
+ async def execute_prompt(self, optimizer, prompt_path, data, model, initial=False, k=3):
+
+ optimizer.prompt = optimizer.prompt_utils.load_prompt(optimizer.round, prompt_path)
+ evaluator = QuickExecute(prompt=optimizer.prompt, k=k, model=model)
+
+ answers = await evaluator.prompt_execute()
+
+ cur_round = optimizer.round + 1 if not initial else optimizer.round
+
+ new_data = {"round": cur_round, "answers": answers, "prompt": optimizer.prompt}
+
+ return new_data
+
+ async def evaluate_prompt(self, optimizer, sample, new_sample, path, data, model, initial=False):
+
+ evaluator = QuickEvaluate(k=3)
+ original_token = count_tokens(sample)
+ new_token = count_tokens(new_sample)
+
+ if initial is True:
+ succeed = True
+ else:
+ evaluation_results = []
+ for _ in range(4):
+ result = await evaluator.prompt_evaluate(sample=sample, new_sample=new_sample, model=model)
+ evaluation_results.append(result)
+
+ logger.info(evaluation_results)
+
+ true_count = evaluation_results.count(True)
+ false_count = evaluation_results.count(False)
+ succeed = true_count > false_count
+
+ new_data = optimizer.data_utils.create_result_data(new_sample['round'], new_sample['answers'],
+ new_sample['prompt'], succeed, new_token)
+
+ data.append(new_data)
+
+ result_path = optimizer.data_utils.get_results_file_path(path)
+
+ optimizer.data_utils.save_results(result_path, data)
+
+ answers = new_sample['answers']
+
+ return succeed, answers
diff --git a/metagpt/ext/spo/scripts/utils/load.py b/metagpt/ext/spo/scripts/utils/load.py
new file mode 100644
index 000000000..22bc10e80
--- /dev/null
+++ b/metagpt/ext/spo/scripts/utils/load.py
@@ -0,0 +1,51 @@
+import yaml
+import random
+import os
+
+FILE_NAME = 'meta.yaml' # 默认值
+
+
+def load_llm():
+ # 读取上一级目录中的 YAML 配置文件
+ config_path = os.path.join(os.path.dirname(__file__), '..', 'config.yaml')
+ with open(config_path, 'r') as file:
+ config = yaml.safe_load(file)
+
+ return config
+
+
+def set_file_name(name):
+ global FILE_NAME
+ FILE_NAME = name
+
+
+def load_meta_data(k=5):
+
+ k = 5
+ # 读取 YAML 文件
+ config_path = os.path.join(os.path.dirname(__file__), '../settings', FILE_NAME)
+ with open(config_path, 'r', encoding='utf-8') as file:
+ data = yaml.safe_load(file)
+
+ qa = []
+
+ # 提取问题和答案
+ for item in data['faq']:
+ question = item['question']
+ answer = item['answer']
+ qa.append({'question': question, 'answer': answer})
+
+ prompt = data['prompt']
+ requirements = data['requirements']
+ count = data['count']
+
+ if isinstance(count, int):
+ count = f", within {count} words"
+ else:
+ count = ""
+
+ # 随机选择三组问答
+ random_qa = random.sample(qa, min(k, len(qa))) # 确保不超过列表长度
+
+ return prompt, requirements, random_qa, count
+
diff --git a/metagpt/ext/spo/scripts/utils/prompt_utils.py b/metagpt/ext/spo/scripts/utils/prompt_utils.py
new file mode 100644
index 000000000..806423572
--- /dev/null
+++ b/metagpt/ext/spo/scripts/utils/prompt_utils.py
@@ -0,0 +1,43 @@
+import json
+import os
+import re
+import time
+import traceback
+from typing import List
+from utils.logs import logger
+
+
+class PromptUtils:
+ def __init__(self, root_path: str):
+ self.root_path = root_path
+
+ def create_round_directory(self, prompt_path: str, round_number: int) -> str:
+ directory = os.path.join(prompt_path, f"round_{round_number}")
+ os.makedirs(directory, exist_ok=True)
+ return directory
+
+ def load_prompt(self, round_number: int, prompts_path: str):
+ prompt_file_name = f"{prompts_path}/prompt.txt"
+
+ try:
+ with open(prompt_file_name, 'r', encoding='utf-8') as file:
+ return file.read()
+ except FileNotFoundError as e:
+ logger.info(f"Error loading prompt for round {round_number}: {e}")
+ raise
+
+ def write_answers(self, directory: str, answers: dict, name: str = "answers.txt"):
+
+ with open(os.path.join(directory, name), "w", encoding="utf-8") as file:
+ for item in answers:
+ file.write(f"Question:\n{item['question']}\n")
+ file.write(f"Answer:\n{item['answer']}\n")
+ file.write("\n")
+
+ def write_prompt(self, directory: str, prompt: str):
+
+ with open(os.path.join(directory, "prompt.txt"), "w", encoding="utf-8") as file:
+ file.write(prompt)
+ with open(os.path.join(directory, "__init__.py"), "w", encoding="utf-8") as file:
+ file.write("")
+
diff --git a/metagpt/ext/spo/settings/Poem.yaml b/metagpt/ext/spo/settings/Poem.yaml
new file mode 100644
index 000000000..74aa1565f
--- /dev/null
+++ b/metagpt/ext/spo/settings/Poem.yaml
@@ -0,0 +1,23 @@
+prompt: |
+ Create poetry in the requested style and format.
+
+requirements: |
+ None
+
+count: None
+
+faq:
+ - question: |
+ Write a modern sonnet about climate change
+ answer: |
+ None
+
+ - question: |
+ Create a haiku series about New York City
+ answer: |
+ None
+
+ - question: |
+ Write a free verse poem about social media
+ answer: |
+ None