Update operator.py

This commit is contained in:
didi 2024-10-21 23:11:48 +08:00
parent 2d1d7ca219
commit 23eec00b00
15 changed files with 135 additions and 291 deletions

View file

@ -20,34 +20,38 @@ import re
class Operator:
def __init__(self, name, llm: LLM):
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
fill_kwargs = {"context": prompt, "llm": self.llm}
node = await ActionNode.from_pydantic(AnswerGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
response = await self._fill_node(AnswerGenerateOp, prompt, mode="context_fill")
return response
class ScEnsemble(Operator):
@ -58,8 +62,8 @@ class ScEnsemble(Operator):
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str]):
answer_mapping = {}
@ -69,8 +73,7 @@ class ScEnsemble(Operator):
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
response = await self._fill_node(ScEnsembleOp, prompt, mode="context_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()

View file

@ -17,28 +17,35 @@ import asyncio
import logging
class Operator:
def __init__(self, name, llm: LLM):
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
def run_code(code):
try:
# 创建一个新的全局命名空间
# Create a new global namespace
global_namespace = {}
disallowed_imports = [
@ -47,73 +54,65 @@ def run_code(code):
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
]
# 检查禁止导入的库
# Check for prohibited imports
for lib in disallowed_imports:
if f"import {lib}" in code or f"from {lib}" in code:
logging.warning("检测到禁止导入的库: %s", lib)
return "Error", f"禁止导入的库: {lib} 以及绘图类功能"
logger.info("Detected prohibited import: %s", lib)
return "Error", f"Prohibited import: {lib} and graphing functionalities"
# 使用 exec 执行代码
# Use exec to execute the code
exec(code, global_namespace)
# 假设代码中定义了一个名为 'solve' 的函数
# Assume the code defines a function named 'solve'
if 'solve' in global_namespace and callable(global_namespace['solve']):
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Error", "未找到 'solve' 函数"
return "Error", "Function 'solve' not found"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"
class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
super().__init__(name, llm)
super().__init__(llm, name)
async def exec_code(self, code, timeout=30):
"""
异步执行代码并在超时时返回错误
Asynchronously execute code and return an error if timeout occurs.
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
# 提交 run_code 任务到进程池
# Submit run_code task to the process pool
future = loop.run_in_executor(executor, run_code, code)
# 等待任务完成或超时
# Wait for the task to complete or timeout
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
# 超时,尝试关闭进程池
# Timeout, attempt to shut down the process pool
executor.shutdown(wait=False, cancel_futures=True)
return "Error", "代码执行超时"
return "Error", "Code execution timed out"
except Exception as e:
return "Error", f"未知错误: {str(e)}"
return "Error", f"Unknown error: {str(e)}"
async def code_generate(self, problem, analysis, feedback, mode):
"""
生成代码的异步方法
Asynchronous method to generate code.
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
problem=problem,
analysis=analysis,
feedback=feedback
)
fill_kwargs = {
"context": prompt,
"llm": self.llm,
"function_name": "solve"
}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(CodeGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve")
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def __call__(self, problem: str, analysis: str = "None"):
"""
调用方法生成代码并执行最多重试 3
Call method, generate code and execute, retry up to 3 times.
"""
code = None
output = None
@ -122,12 +121,12 @@ class Programmer(Operator):
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
code = code_response.get("code")
if not code:
return {"code": code, "output": "未生成代码"}
return {"code": code, "output": "No code generated"}
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
else:
print(f"{i + 1}次执行错误,错误信息:{output}")
print(f"Execution error on attempt {i + 1}, error message: {output}")
feedback = (
f"\nThe result of the error from the code you wrote in the previous round:\n"
f"Code: {code}\n\nStatus: {status}, {output}"
@ -143,19 +142,18 @@ class ScEnsemble(Operator):
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str], problem: str):
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, problem=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
response = await self._fill_node(ScEnsembleOp, prompt, mode="context_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()

View file

@ -1,32 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 22:07 PM
# @Author : didi
# @Desc : Basic Graph Class
from typing import Literal
import examples.aflow.scripts.optimized.HotpotQA.workflows.template.operator as operator
import examples.aflow.scripts.optimized.HotpotQA.workflows.round_1.prompt as prompt_custom
from metagpt.provider.llm_provider_registry import create_llm_instance
from metagpt.utils.cost_manager import CostManager
DatasetType = Literal["HumanEval", "MBPP", "GSM8K", "MATH", "HotpotQA", "DROP"]
class Workflow:
def __init__(
self,
name: str,
llm_config,
dataset: DatasetType,
) -> None:
self.name = name
self.dataset = dataset
self.llm = create_llm_instance(llm_config)
self.llm.cost_manager = CostManager()
self.custom = operator.Custom(self.llm)
async def __call__(self, problem: str):
"""
Implementation of the workflow
"""
solution = await self.custom(input=problem, instruction="")
return solution['response'], self.llm.cost_manager.total_cost

View file

@ -1,6 +0,0 @@
# XXX_PROMPT = """
#
# Solve it.
#
# """

View file

@ -1,15 +0,0 @@
SC_ENSEMBLE_PROMPT = """
Several answers have been generated to a same question. They are as follows:
{solutions}
Identify the concise answer that appears most frequently across them. This consistency in answers is crucial for determining the most reliable solution.
In the "thought" field, provide a detailed explanation of your thought process. In the "solution_letter" field, output only the single letter ID (A, B, C, etc.) corresponding to the most consistent solution. Do not include any additional text or explanation in the "solution_letter" field.
"""
ANSWER_GENERATION_PROMPT = """
Think step by step and solve the problem.
1. In the "thought" field, explain your thinking process in detail.
2. In the "answer" field, provide the final answer concisely and clearly. The answer should be a direct response to the question, without including explanations or reasoning.
Your task: {input}
"""

View file

@ -1,14 +0,0 @@
{
"Custom": {
"description": "Generates anything based on customized input and instruction.",
"interface": "custom(input: str, instruction: str) -> dict with key 'response' of type str"
},
"ScEnsemble": {
"description": "Uses self-consistency to select the solution that appears most frequently in the solution list, improve the selection to enhance the choice of the best solution.",
"interface": "sc_ensemble(solutions: List[str]) -> dict with key 'response' of type str"
},
"AnswerGenerate": {
"description": "Generate step by step based on the input. The step by step thought process is in the field of 'thought', and the final answer is in the field of 'answer'.",
"interface": "answer_generate(input: str) -> dict with key 'thought' of type str, 'answer' of type str"
}
}

View file

@ -1,78 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 17:36 PM
# @Author : didi
# @Desc : operator demo of ags
import ast
import random
import sys
import traceback
from collections import Counter
from typing import Dict, List, Tuple
from tenacity import retry, stop_after_attempt, wait_fixed
from examples.aflow.scripts.optimized.HotpotQA.workflows.template.operator_an import *
from examples.aflow.scripts.optimized.HotpotQA.workflows.template.op_prompt import *
from metagpt.actions.action_node import ActionNode
from metagpt.llm import LLM
from metagpt.logs import logger
import re
class Operator:
def __init__(self, name, llm: LLM):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
return response
class AnswerGenerate(Operator):
def __init__(self, llm: LLM, name: str = "AnswerGenerate"):
super().__init__(name, llm)
async def __call__(self, input: str, mode: str = None) -> Tuple[str, str]:
prompt = ANSWER_GENERATION_PROMPT.format(input=input)
fill_kwargs = {"context": prompt, "llm": self.llm}
node = await ActionNode.from_pydantic(AnswerGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
Link: https://arxiv.org/abs/2203.11171
Paper: Universal Self-Consistency for Large Language Model Generation
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
return {"response": solutions[answer_mapping[answer]]}

View file

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# @Date : 6/27/2024 19:46 PM
# @Author : didi
# @Desc : action nodes for operator
from pydantic import BaseModel, Field
class GenerateOp(BaseModel):
response: str = Field(default="", description="Your solution for this problem")
class ScEnsembleOp(BaseModel):
thought: str = Field(default="", description="The thought of the most consistent solution.")
solution_letter: str = Field(default="", description="The letter of most consistent solution.")
class AnswerGenerateOp(BaseModel):
thought: str = Field(default="", description="The step by step thinking process")
answer: str = Field(default="", description="The final answer to the question")

View file

@ -19,34 +19,41 @@ import re
class Operator:
def __init__(self, name, llm: LLM):
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, problem, entry_point, instruction):
prompt = instruction + problem
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, function_name=entry_point, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="code_fill", function_name=entry_point)
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
@ -55,19 +62,18 @@ class ScEnsemble(Operator):
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str], problem: str):
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
response = await self._fill_node(ScEnsembleOp, prompt, mode="context_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
@ -75,8 +81,8 @@ class ScEnsemble(Operator):
return {"response": solutions[answer_mapping[answer]]}
class Test(Operator):
def __init__(self, llm, name: str = "Test"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "Test"):
super().__init__(llm, name)
def exec_code(self, solution, entry_point):
@ -131,8 +137,7 @@ class Test(Operator):
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
else:
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
@ -141,8 +146,7 @@ class Test(Operator):
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)

View file

@ -17,28 +17,35 @@ import asyncio
import logging
class Operator:
def __init__(self, name, llm: LLM):
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
def run_code(code):
try:
# 创建一个新的全局命名空间
# Create a new global namespace
global_namespace = {}
disallowed_imports = [
@ -47,73 +54,65 @@ def run_code(code):
"pylab", "tkinter", "PyQt5", "wx", "pyglet"
]
# 检查禁止导入的库
# Check for prohibited imports
for lib in disallowed_imports:
if f"import {lib}" in code or f"from {lib}" in code:
logging.warning("检测到禁止导入的库: %s", lib)
return "Error", f"禁止导入的库: {lib} 以及绘图类功能"
logger.info("Detected prohibited import: %s", lib)
return "Error", f"Prohibited import: {lib} and graphing functionalities"
# 使用 exec 执行代码
# Use exec to execute the code
exec(code, global_namespace)
# 假设代码中定义了一个名为 'solve' 的函数
# Assume the code defines a function named 'solve'
if 'solve' in global_namespace and callable(global_namespace['solve']):
result = global_namespace['solve']()
return "Success", str(result)
else:
return "Error", "未找到 'solve' 函数"
return "Error", "Function 'solve' not found"
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_str = traceback.format_exception(exc_type, exc_value, exc_traceback)
return "Error", f"执行错误: {str(e)}\n{''.join(tb_str)}"
return "Error", f"Execution error: {str(e)}\n{''.join(tb_str)}"
class Programmer(Operator):
def __init__(self, llm: LLM, name: str = "Programmer"):
super().__init__(name, llm)
super().__init__(llm, name)
async def exec_code(self, code, timeout=30):
"""
异步执行代码并在超时时返回错误
Asynchronously execute code and return an error if timeout occurs.
"""
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
try:
# 提交 run_code 任务到进程池
# Submit run_code task to the process pool
future = loop.run_in_executor(executor, run_code, code)
# 等待任务完成或超时
# Wait for the task to complete or timeout
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
# 超时,尝试关闭进程池
# Timeout, attempt to shut down the process pool
executor.shutdown(wait=False, cancel_futures=True)
return "Error", "代码执行超时"
return "Error", "Code execution timed out"
except Exception as e:
return "Error", f"未知错误: {str(e)}"
return "Error", f"Unknown error: {str(e)}"
async def code_generate(self, problem, analysis, feedback, mode):
"""
生成代码的异步方法
Asynchronous method to generate code.
"""
prompt = PYTHON_CODE_VERIFIER_PROMPT.format(
problem=problem,
analysis=analysis,
feedback=feedback
)
fill_kwargs = {
"context": prompt,
"llm": self.llm,
"function_name": "solve"
}
if mode:
fill_kwargs["mode"] = mode
node = await ActionNode.from_pydantic(CodeGenerateOp).fill(**fill_kwargs)
response = node.instruct_content.model_dump()
response = await self._fill_node(CodeGenerateOp, prompt, mode, function_name="solve")
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
async def __call__(self, problem: str, analysis: str = "None"):
"""
调用方法生成代码并执行最多重试 3
Call method, generate code and execute, retry up to 3 times.
"""
code = None
output = None
@ -122,12 +121,12 @@ class Programmer(Operator):
code_response = await self.code_generate(problem, analysis, feedback, mode="code_fill")
code = code_response.get("code")
if not code:
return {"code": code, "output": "未生成代码"}
return {"code": code, "output": "No code generated"}
status, output = await self.exec_code(code)
if status == "Success":
return {"code": code, "output": output}
else:
print(f"{i + 1}次执行错误,错误信息:{output}")
print(f"Execution error on attempt {i + 1}, error message: {output}")
feedback = (
f"\nThe result of the error from the code you wrote in the previous round:\n"
f"Code: {code}\n\nStatus: {status}, {output}"
@ -143,19 +142,18 @@ class ScEnsemble(Operator):
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str], problem: str):
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, problem=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
response = await self._fill_node(ScEnsembleOp, prompt, mode="context_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()

View file

@ -19,34 +19,41 @@ import re
class Operator:
def __init__(self, name, llm: LLM):
def __init__(self, llm: LLM, name: str):
self.name = name
self.llm = llm
def __call__(self, *args, **kwargs):
raise NotImplementedError
async def _fill_node(self, op_class, prompt, mode=None, **extra_kwargs):
fill_kwargs = {"context": prompt, "llm": self.llm}
if mode:
fill_kwargs["mode"] = mode
fill_kwargs.update(extra_kwargs)
node = await ActionNode.from_pydantic(op_class).fill(**fill_kwargs)
return node.instruct_content.model_dump()
class Custom(Operator):
def __init__(self, llm: LLM, name: str = "Custom"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, input, instruction):
prompt = instruction + input
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, mode="single_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="single_fill")
return response
class CustomCodeGenerate(Operator):
def __init__(self, llm: LLM, name: str = "CustomCodeGenerate"):
super().__init__(name, llm)
super().__init__(llm, name)
async def __call__(self, problem, entry_point, instruction):
prompt = instruction + problem
node = await ActionNode.from_pydantic(GenerateOp).fill(context=prompt, llm=self.llm, function_name=entry_point, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(GenerateOp, prompt, mode="code_fill", function_name=entry_point)
return response
class ScEnsemble(Operator):
"""
Paper: Self-Consistency Improves Chain of Thought Reasoning in Language Models
@ -55,19 +62,18 @@ class ScEnsemble(Operator):
Link: https://arxiv.org/abs/2311.17311
"""
def __init__(self,llm: LLM , name: str = "ScEnsemble"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "ScEnsemble"):
super().__init__(llm, name)
async def __call__(self, solutions: List[str], problem: str):
async def __call__(self, solutions: List[str]):
answer_mapping = {}
solution_text = ""
for index, solution in enumerate(solutions):
answer_mapping[chr(65 + index)] = index
solution_text += f"{chr(65 + index)}: \n{str(solution)}\n\n\n"
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text, question=problem)
node = await ActionNode.from_pydantic(ScEnsembleOp).fill(context=prompt, llm=self.llm)
response = node.instruct_content.model_dump()
prompt = SC_ENSEMBLE_PROMPT.format(solutions=solution_text)
response = await self._fill_node(ScEnsembleOp, prompt, mode="context_fill")
answer = response.get("solution_letter", "")
answer = answer.strip().upper()
@ -75,12 +81,12 @@ class ScEnsemble(Operator):
return {"response": solutions[answer_mapping[answer]]}
class Test(Operator):
def __init__(self, llm, name: str = "Test"):
super().__init__(name, llm)
def __init__(self, llm: LLM, name: str = "Test"):
super().__init__(llm, name)
def exec_code(self, solution, entry_point):
test_cases = extract_test_cases_from_jsonl(entry_point, "MBPP")
test_cases = extract_test_cases_from_jsonl(entry_point)
fail_cases = []
for test_case in test_cases:
@ -131,8 +137,7 @@ class Test(Operator):
exec_pass=f"executed unsuccessfully, error: \n {result}",
test_fail="executed unsucessfully",
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
else:
prompt = REFLECTION_ON_PUBLIC_TEST_PROMPT.format(
@ -141,8 +146,7 @@ class Test(Operator):
exec_pass="executed successfully",
test_fail=result,
)
node = await ActionNode.from_pydantic(ReflectionTestOp).fill(context=prompt, llm=self.llm, mode="code_fill")
response = node.instruct_content.model_dump()
response = await self._fill_node(ReflectionTestOp, prompt, mode="code_fill")
solution = response["reflection_and_solution"]
result = self.exec_code(solution, entry_point)