Initial release: TinyForge-Zero recipe + mined pairs + reproduction guide

Companion artifact for the paper 'How Far Can an Open Base Model
Self-Improve? Recipes, Limits, and Test-Time Synergy'.

Contents:
- recipe/{train_on_pairs,bootstrap,multi_pair_14b,curriculum_math,eval_raw,eval_plus,confirm}.py
- data/pairs_{7b_40,14b_multi_new60,math_13}.jsonl (released mined pairs)
- controls/mbpp_corrupt_control.py (the +0 negative control)
- docs/{scaling_chart,fig1_headline,fig6_boundary}.png
- REPRODUCE.md (paper claim -> exact command mapping)
This commit is contained in:
Rana Usman 2026-05-13 20:43:52 +05:00
commit 6305ff0f91
20 changed files with 2438 additions and 0 deletions

357
recipe/bootstrap.py Normal file
View file

@ -0,0 +1,357 @@
"""Self-Bootstrapping TinyForge.
Single model. No external dataset. Just a Python interpreter.
Loop:
for iter in 1..N:
1. Model generates K problems (function signature + tests + canonical solution)
2. Filter: keep only those where canonical executes & tests pass
3. Model solves each fresh (forget canonical)
4. Verify against tests identify failures
5. Model repairs each failure (one shot, with error)
6. Verify repairs collect (broken, fixed) pairs
7. Periodically: LoRA-train on accumulated pairs
8. Periodically: eval on held-out HumanEval-mini
If accuracy on HumanEval rises without ever seeing HumanEval problems recipe works.
"""
import os, sys, json, time, re, gc, subprocess, tempfile, argparse, random, math
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
from datasets import load_dataset, Dataset as HFDataset
from peft import LoraConfig, get_peft_model
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def extract_code(text):
if "```python" in text: text = text.split("```python", 1)[1]
elif "```" in text: text = text.split("```", 1)[1]
if "```" in text: text = text.split("```", 1)[0]
return text.strip()
def run_python(code, timeout=8):
"""Run code in subprocess. Return (passed, stderr_or_msg)."""
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
f.write(code); path = f.name
try:
r = subprocess.run(["python3", path], capture_output=True, timeout=timeout, text=True, cwd="/tmp")
if r.returncode == 0: return True, ""
err = (r.stderr or r.stdout).strip().splitlines()
return False, "\n".join(err[-3:])[:300]
except subprocess.TimeoutExpired: return False, "timeout"
finally:
try: os.unlink(path)
except: pass
def gen_batch(model, tok, prompts, max_new=400, temperature=0.7, batch=8):
outs = []
for i in range(0, len(prompts), batch):
chunk = prompts[i:i+batch]
texts = []
for p in chunk:
msgs = [{"role": "system", "content": "You are a Python coder."},
{"role": "user", "content": p}]
texts.append(tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True))
inp = tok(texts, return_tensors="pt", padding=True, truncation=True, max_length=1500).to(model.device)
with torch.no_grad():
out = model.generate(**inp, max_new_tokens=max_new, do_sample=temperature > 0,
temperature=temperature if temperature > 0 else 1.0, top_p=0.95,
pad_token_id=tok.eos_token_id)
for j in range(out.size(0)):
outs.append(tok.decode(out[j][inp.input_ids.shape[1]:], skip_special_tokens=True))
return outs
PROBLEM_GEN_PROMPT = """Generate ONE simple Python coding problem with a clear function spec and 3 test assertions.
Output format (exactly one ```python block):
```python
def {function_name}({args}):
\"\"\"{one-line description of what the function does}\"\"\"
{implementation}
# tests
assert {function_name}(...) == ...
assert {function_name}(...) == ...
assert {function_name}(...) == ...
```
Make the function specific and concrete. The function should be 3-15 lines. Tests must verify the function works correctly. Output ONLY the code block."""
def parse_generated_problem(raw_code):
"""Split into (function_signature_with_docstring, full_solution_code, test_lines).
Returns None if parsing fails or it's malformed."""
code = raw_code.strip()
if "def " not in code: return None
# Find first def
lines = code.split("\n")
func_start = None
for i, l in enumerate(lines):
if l.startswith("def "):
func_start = i; break
if func_start is None: return None
# Find tests (assert lines after the def block)
tests = []
in_def_body = False
def_end = None
for i in range(func_start, len(lines)):
l = lines[i]
if l.startswith("def ") and i > func_start: break
if l.startswith("assert "):
tests.append(l)
if def_end is None: def_end = i
elif tests and not l.strip().startswith(("#", "assert", "")):
break
if len(tests) < 2: return None
if def_end is None: def_end = len(lines)
full_solution = "\n".join(lines[func_start:def_end]).strip()
if len(full_solution) < 30: return None
# Build function signature stub for re-implementation
# Find docstring if present
sig_lines = []
for i in range(func_start, def_end):
l = lines[i]
sig_lines.append(l)
if i > func_start and l.strip().endswith('"""') and ('"""' in lines[i-1] or '"""' in l[:l.rfind('"""')]):
break
if i > func_start and l.strip().startswith('"""') and l.strip().endswith('"""') and l.strip() != '"""':
break
# If no docstring, stop after the def line itself
if i == func_start and not any('"""' in lines[j] for j in range(i, min(i+5, def_end))):
sig_lines.append(" pass")
break
signature = "\n".join(sig_lines)
# Extract function name from signature
m = re.match(r"def\s+(\w+)\s*\(", lines[func_start])
if not m: return None
fn_name = m.group(1)
return {
"fn_name": fn_name,
"signature": signature,
"canonical": full_solution,
"tests": tests,
"raw": code,
}
# ── Loop ────────────────────────────────────────────────────────────────
def humaneval_eval(model, tok, n=30):
"""Eval on HumanEval-mini (first N problems)."""
he = list(load_dataset("openai_humaneval", split="test"))[:n]
prompts = [p["prompt"] + "\n# Complete the function above." for p in he]
outs = gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=4)
correct = 0
for p, raw in zip(he, outs):
code = extract_code(raw) if "```" in raw else raw
# Try the model's completion combined with the prompt
full = p["prompt"] + "\n" + code if "def " not in code else code
test_code = full + "\n\n" + p["test"] + f"\n\ncheck({p['entry_point']})"
ok, _ = run_python(test_code, timeout=10)
if ok: correct += 1
return correct, n
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", default="Qwen/Qwen2.5-Coder-1.5B-Instruct")
ap.add_argument("--gpu", type=int, default=0)
ap.add_argument("--iterations", type=int, default=20)
ap.add_argument("--problems_per_iter", type=int, default=16)
ap.add_argument("--train_every", type=int, default=10)
ap.add_argument("--eval_every", type=int, default=10)
ap.add_argument("--tag", required=True)
args = ap.parse_args()
out_dir = f"/workspace/bootstrap/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
device = torch.device(f"cuda:{args.gpu}")
log(f"loading {args.model}")
tok = AutoTokenizer.from_pretrained(args.model)
if tok.pad_token is None: tok.pad_token = tok.eos_token
tok.padding_side = "left"
model = AutoModelForCausalLM.from_pretrained(args.model, dtype=torch.bfloat16, device_map=f"cuda:{args.gpu}")
log(f" loaded mem={torch.cuda.memory_allocated(device)/1e9:.1f}GB")
# Initial eval
log("INITIAL eval on HumanEval-mini")
init_correct, init_total = humaneval_eval(model, tok, n=30)
log(f" HumanEval-mini base: {init_correct}/{init_total}")
# LoRA setup (will be applied for training, base kept frozen)
lora_cfg = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], task_type="CAUSAL_LM")
model = get_peft_model(model, lora_cfg)
log(f" LoRA applied; trainable={sum(p.numel() for p in model.parameters() if p.requires_grad)/1e6:.1f}M")
accumulated_pairs = []
eval_log = [{"iter": 0, "correct": init_correct, "total": init_total}]
iter_stats = []
for it in range(1, args.iterations + 1):
it_t = time.time()
# 1. Generate K problems
gen_prompts = [PROBLEM_GEN_PROMPT for _ in range(args.problems_per_iter)]
raw_problems = gen_batch(model, tok, gen_prompts, max_new=400, temperature=0.9)
# 2. Parse + verify canonical
valid_problems = []
for raw in raw_problems:
code = extract_code(raw) if "```" in raw else raw
parsed = parse_generated_problem(code)
if parsed is None: continue
# Verify canonical passes its own tests
full = parsed["canonical"] + "\n\n" + "\n".join(parsed["tests"])
ok, _ = run_python(full)
if ok: valid_problems.append(parsed)
if not valid_problems:
log(f"iter {it}: 0 valid problems generated, skipping")
iter_stats.append({"iter": it, "valid": 0, "fails": 0, "repairs": 0})
continue
# 3. Model solves each fresh — N=4 sampled attempts at temp=0.8 to surface natural fails
N_ATTEMPTS = 4
solve_prompts = [f"Implement this function so it passes the tests below.\n\n```python\n{p['signature']}\n```\n\nTests:\n{chr(10).join(p['tests'])}\n\nOutput only the function implementation in one ```python block." for p in valid_problems]
# Generate N attempts each (4 * len(prompts) total)
all_solve_prompts = solve_prompts * N_ATTEMPTS
all_attempts = gen_batch(model, tok, all_solve_prompts, max_new=400, temperature=0.8)
# Reshape: by problem, list of N attempts
per_problem_attempts = [all_attempts[i::len(valid_problems)] for i in range(len(valid_problems))]
# 4-5. Mine (broken, fixed) pairs from same model's diverse outputs
failures = []
new_pairs = 0
for p, attempts in zip(valid_problems, per_problem_attempts):
broken_one = None; fixed_one = None; broken_err = None
for raw in attempts:
code = extract_code(raw) if "```" in raw else raw
full = code + "\n\n" + "\n".join(p["tests"])
ok, err = run_python(full)
if ok and fixed_one is None:
fixed_one = code
elif not ok and broken_one is None:
broken_one = code; broken_err = err
if broken_one and fixed_one: break
if broken_one is None:
continue
if fixed_one is not None:
# Self-mined repair pair from same-model diverse outputs
accumulated_pairs.append({
"signature": p["signature"], "tests": p["tests"],
"broken": broken_one, "error": broken_err, "fixed": fixed_one,
})
new_pairs += 1
else:
# All attempts failed — try one more repair pass with explicit error
failures.append({"p": p, "broken": broken_one, "error": broken_err})
# Optional: try repair on remaining all-failed cases
if failures:
repair_prompts = [f"Implement: {f['p']['signature']}\n\nTests:\n{chr(10).join(f['p']['tests'])}\n\nMy attempt:\n```python\n{f['broken']}\n```\n\nError:\n{f['error']}\n\nFix and output the corrected code only." for f in failures]
repairs = gen_batch(model, tok, repair_prompts, max_new=400, temperature=0.8)
for f, raw in zip(failures, repairs):
fix = extract_code(raw) if "```" in raw else raw
full = fix + "\n\n" + "\n".join(f["p"]["tests"])
ok, _ = run_python(full)
if ok:
accumulated_pairs.append({
"signature": f["p"]["signature"], "tests": f["p"]["tests"],
"broken": f["broken"], "error": f["error"], "fixed": fix,
})
new_pairs += 1
log(f"iter {it}: {len(valid_problems)} valid problems, {len(failures)} failures, {new_pairs} repair pairs harvested (total: {len(accumulated_pairs)}) [{time.time()-it_t:.0f}s]")
iter_stats.append({"iter": it, "valid": len(valid_problems), "fails": len(failures), "repairs": new_pairs, "elapsed": time.time()-it_t})
# Save incrementally (in case of crash)
with open(f"{out_dir}/pairs.jsonl", "w") as fh:
for r in accumulated_pairs: fh.write(json.dumps(r) + "\n")
# 6. Periodic training
if it % args.train_every == 0 and len(accumulated_pairs) >= 10:
log(f" TRAINING on {len(accumulated_pairs)} pairs")
tok.padding_side = "right"
def make_example(r):
user = f"Implement: {r['signature']}\n\nTests:\n{chr(10).join(r['tests'])}\n\nMy attempt:\n```python\n{r['broken']}\n```\n\nError:\n{r['error']}\n\nFix and output the corrected code only."
assistant = f"```python\n{r['fixed']}\n```"
msgs_pre = [{"role": "system", "content": "You are a Python coder."},
{"role": "user", "content": user}]
msgs_full = msgs_pre + [{"role": "assistant", "content": assistant}]
pre = tok.apply_chat_template(msgs_pre, tokenize=False, add_generation_prompt=True)
full = tok.apply_chat_template(msgs_full, tokenize=False)
pre_ids = tok(pre, add_special_tokens=False)["input_ids"]
full_ids = tok(full, add_special_tokens=False)["input_ids"]
MAX = 1024
full_ids = full_ids[:MAX]
labels = list(full_ids)
n_pre = min(len(pre_ids), len(labels))
for i in range(n_pre): labels[i] = -100
pad = MAX - len(full_ids)
return {"input_ids": full_ids + [tok.pad_token_id]*pad,
"attention_mask": [1]*len(full_ids) + [0]*pad,
"labels": labels + [-100]*pad}
ds = HFDataset.from_list([make_example(r) for r in accumulated_pairs])
targs = TrainingArguments(
output_dir=f"{out_dir}/ckpt_iter{it}", num_train_epochs=2,
per_device_train_batch_size=1, gradient_accumulation_steps=4,
learning_rate=1e-4, bf16=True, logging_steps=20,
save_strategy="no", report_to="none", remove_unused_columns=False, warmup_ratio=0.05,
)
Trainer(model=model, args=targs, train_dataset=ds, processing_class=tok).train()
tok.padding_side = "left"
# 7. Periodic eval
if it % args.eval_every == 0:
model.eval()
corr, tot = humaneval_eval(model, tok, n=30)
log(f" HumanEval-mini @ iter {it}: {corr}/{tot}")
eval_log.append({"iter": it, "correct": corr, "total": tot})
model.train()
# Final eval
model.eval()
final_correct, final_total = humaneval_eval(model, tok, n=30)
eval_log.append({"iter": args.iterations, "correct": final_correct, "total": final_total, "final": True})
# Save everything
with open(f"{out_dir}/iter_stats.jsonl", "w") as fh:
for r in iter_stats: fh.write(json.dumps(r) + "\n")
with open(f"{out_dir}/eval_log.json", "w") as fh:
json.dump(eval_log, fh, indent=2)
with open(f"{out_dir}/pairs.jsonl", "w") as fh:
for r in accumulated_pairs: fh.write(json.dumps(r) + "\n")
print()
print("=" * 70)
print(f" MODEL: {args.model}")
print(f" ITERATIONS: {args.iterations}, problems/iter: {args.problems_per_iter}")
print(f" TOTAL repair pairs: {len(accumulated_pairs)}")
print(f" HUMANEVAL-MINI: base={init_correct}/{init_total} final={final_correct}/{final_total} Δ={final_correct-init_correct:+d}")
print(f" time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

165
recipe/confirm.py Normal file
View file

@ -0,0 +1,165 @@
"""Confirm the peak +5 result on full HumanEval (164 problems) and try the cliff at 39 pairs."""
import os, sys, json, time, re, gc, subprocess, tempfile, argparse
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "0")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
from datasets import load_dataset, Dataset as HFDataset
from peft import LoraConfig, get_peft_model
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def extract_code(text):
if "```python" in text: text = text.split("```python", 1)[1]
elif "```" in text: text = text.split("```", 1)[1]
if "```" in text: text = text.split("```", 1)[0]
return text.strip()
def run_python(code, timeout=10):
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
f.write(code); path = f.name
try:
r = subprocess.run(["python3", path], capture_output=True, timeout=timeout, text=True, cwd="/tmp")
return r.returncode == 0
except subprocess.TimeoutExpired: return False
finally:
try: os.unlink(path)
except: pass
def gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=4):
outs = []
for i in range(0, len(prompts), batch):
chunk = prompts[i:i+batch]
texts = []
for p in chunk:
msgs = [{"role": "system", "content": "You are a Python coder."},
{"role": "user", "content": p}]
texts.append(tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True))
inp = tok(texts, return_tensors="pt", padding=True, truncation=True, max_length=1500).to(model.device)
with torch.no_grad():
out = model.generate(**inp, max_new_tokens=max_new, do_sample=temperature > 0,
temperature=temperature if temperature > 0 else 1.0, top_p=0.95,
pad_token_id=tok.eos_token_id)
for j in range(out.size(0)):
outs.append(tok.decode(out[j][inp.input_ids.shape[1]:], skip_special_tokens=True))
return outs
def humaneval_full(model, tok):
he = list(load_dataset("openai_humaneval", split="test"))
log(f" full HumanEval: {len(he)} problems")
prompts = [p["prompt"] + "\n# Complete the function above." for p in he]
outs = gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=4)
correct = 0
for p, raw in zip(he, outs):
code = extract_code(raw) if "```" in raw else raw
full = p["prompt"] + "\n" + code if "def " not in code else code
test_code = full + "\n\n" + p["test"] + f"\n\ncheck({p['entry_point']})"
if run_python(test_code, timeout=10): correct += 1
return correct, len(he)
def make_example(r, tok):
user = f"Implement: {r['signature']}\n\nTests:\n{chr(10).join(r['tests'])}\n\nMy attempt:\n```python\n{r['broken']}\n```\n\nError:\n{r['error']}\n\nFix and output the corrected code only."
assistant = f"```python\n{r['fixed']}\n```"
msgs_pre = [{"role": "system", "content": "You are a Python coder."},
{"role": "user", "content": user}]
msgs_full = msgs_pre + [{"role": "assistant", "content": assistant}]
pre = tok.apply_chat_template(msgs_pre, tokenize=False, add_generation_prompt=True)
full = tok.apply_chat_template(msgs_full, tokenize=False)
pre_ids = tok(pre, add_special_tokens=False)["input_ids"]
full_ids = tok(full, add_special_tokens=False)["input_ids"]
MAX = 1024
full_ids = full_ids[:MAX]
labels = list(full_ids)
n_pre = min(len(pre_ids), len(labels))
for i in range(n_pre): labels[i] = -100
pad = MAX - len(full_ids)
return {"input_ids": full_ids + [tok.pad_token_id]*pad,
"attention_mask": [1]*len(full_ids) + [0]*pad,
"labels": labels + [-100]*pad}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--n_pairs", type=int, default=21, help="how many pairs from the saved set to train on")
ap.add_argument("--epochs", type=int, default=2)
ap.add_argument("--seed", type=int, default=42)
ap.add_argument("--tag", required=True)
args = ap.parse_args()
torch.manual_seed(args.seed)
pairs_path = "/workspace/bootstrap/bs_7b_v3/pairs.jsonl"
pairs = [json.loads(l) for l in open(pairs_path)]
log(f"loaded {len(pairs)} pairs from prior bootstrap run")
pairs_use = pairs[:args.n_pairs]
log(f"using {len(pairs_use)} for this run")
out_dir = f"/workspace/confirm/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
log("loading Qwen/Qwen2.5-7B")
tok = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-7B")
if tok.pad_token is None: tok.pad_token = tok.eos_token
tok.padding_side = "left"
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-7B", dtype=torch.bfloat16, device_map="cuda:0")
# Eval base
model.eval()
log("eval BASE on full HumanEval")
base_corr, base_total = humaneval_full(model, tok)
log(f" BASE: {base_corr}/{base_total}")
# Apply LoRA + train
lora_cfg = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], task_type="CAUSAL_LM")
model = get_peft_model(model, lora_cfg)
log("LoRA applied")
tok.padding_side = "right"
examples = [make_example(r, tok) for r in pairs_use]
ds = HFDataset.from_list(examples)
targs = TrainingArguments(
output_dir=f"{out_dir}/ckpt", num_train_epochs=args.epochs,
per_device_train_batch_size=1, gradient_accumulation_steps=4,
learning_rate=1e-4, bf16=True, logging_steps=10,
save_strategy="no", report_to="none", remove_unused_columns=False, warmup_ratio=0.05,
seed=args.seed,
)
log(f"training on {len(ds)} pairs, {args.epochs} epochs")
Trainer(model=model, args=targs, train_dataset=ds, processing_class=tok).train()
log("training done")
tok.padding_side = "left"
# Eval trained
model.eval()
log("eval TRAINED on full HumanEval")
tr_corr, tr_total = humaneval_full(model, tok)
log(f" TRAINED: {tr_corr}/{tr_total}")
result = {
"n_pairs_used": len(pairs_use), "epochs": args.epochs, "seed": args.seed,
"base": [base_corr, base_total], "trained": [tr_corr, tr_total],
"delta": tr_corr - base_corr,
"elapsed_s": time.time() - T0,
}
with open(f"{out_dir}/result.json", "w") as fh:
json.dump(result, fh, indent=2)
print()
print("=" * 70)
print(f" N_PAIRS: {len(pairs_use)} EPOCHS: {args.epochs} SEED: {args.seed}")
print(f" HUMAN-EVAL FULL: base={base_corr}/{base_total} trained={tr_corr}/{tr_total} Δ={tr_corr-base_corr:+d}")
print(f" time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

310
recipe/curriculum_math.py Normal file
View file

@ -0,0 +1,310 @@
"""TinyForge-Zero-Math with self-difficulty curriculum.
Novel: model + interpreter only. No external problem set, no fixed difficulty.
The model's own greedy success/failure on each problem tells the curriculum
to make it harder or easier. Mine pairs only at the edge of competence.
Loop per iter:
1. Generate K problems at current difficulty pool
2. For each: solve greedily (temp=0). Verify against canonical answer.
- If correct: this problem is "easy" ask model to amplify
- If wrong: try N=4 sampled attempts at temp=0.8
- If at-edge (some pass, some fail): MINE a pair
- If all fail: this problem is "too hard" ask model to simplify
3. Add amplified/simplified problems back into the pool for next iter
4. Train on accumulated pairs periodically
"""
import os, sys, json, time, re, gc, argparse, random
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ.setdefault("CUDA_VISIBLE_DEVICES", "1")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1"
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
from datasets import load_dataset, Dataset as HFDataset
from peft import LoraConfig, get_peft_model
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def safe_eval(expr: str):
try:
if not all(c in "0123456789+-*/.()% " for c in expr): return None
return float(eval(expr, {"__builtins__": {}}, {}))
except: return None
def extract_answer(text: str):
m = re.search(r"####\s*(-?\d+(?:\.\d+)?)", text)
if m: return float(m.group(1))
m = re.search(r"\\boxed\{(-?\d+(?:\.\d+)?)\}", text)
if m: return float(m.group(1))
matches = re.findall(r"-?\d+(?:\.\d+)?", text)
if matches:
try: return float(matches[-1])
except: return None
return None
def gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=8):
outs = []
for i in range(0, len(prompts), batch):
chunk = prompts[i:i+batch]
texts = []
for p in chunk:
msgs = [{"role": "system", "content": "You are a careful math tutor."},
{"role": "user", "content": p}]
texts.append(tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True))
inp = tok(texts, return_tensors="pt", padding=True, truncation=True, max_length=1500).to(model.device)
with torch.no_grad():
out = model.generate(**inp, max_new_tokens=max_new, do_sample=temperature > 0,
temperature=temperature if temperature > 0 else 1.0, top_p=0.95,
pad_token_id=tok.eos_token_id)
for j in range(out.size(0)):
outs.append(tok.decode(out[j][inp.input_ids.shape[1]:], skip_special_tokens=True))
return outs
SOLVE_PROMPT = "Solve this math problem step by step. End with the answer on a new line as: #### <number>\n\nProblem: {problem}"
GEN_PROMPT_SEED = """Generate ONE math word problem with a numerical answer. Output exactly:
PROBLEM: <a clear word problem with concrete numbers>
EXPRESSION: <a single Python arithmetic expression that evaluates to the answer>
ANSWER: <the numerical answer>
Make problems grade-school level."""
AMPLIFY_PROMPT = """Take this math problem and make it HARDER by adding ONE more step (e.g., another operation, a percentage, fractions, or an extra constraint). Keep the format:
Original problem: {problem}
Original answer: {answer}
Output exactly:
PROBLEM: <the harder problem>
EXPRESSION: <Python arithmetic expression for the new answer>
ANSWER: <the new numerical answer>"""
SIMPLIFY_PROMPT = """Take this math problem and make it EASIER by removing one step or simplifying numbers. Keep the format:
Original problem: {problem}
Original answer: {answer}
Output exactly:
PROBLEM: <the easier problem>
EXPRESSION: <Python arithmetic expression for the new answer>
ANSWER: <the new numerical answer>"""
def parse_problem(text: str):
p_m = re.search(r"PROBLEM:\s*(.+?)(?:\n|EXPRESSION:)", text, re.DOTALL)
e_m = re.search(r"EXPRESSION:\s*(.+?)(?:\n|ANSWER:)", text, re.DOTALL)
a_m = re.search(r"ANSWER:\s*(-?\d+(?:\.\d+)?)", text)
if not (p_m and e_m and a_m): return None
problem = p_m.group(1).strip()
expression = e_m.group(1).strip()
try: claimed = float(a_m.group(1))
except: return None
if len(problem) < 10: return None
actual = safe_eval(expression)
if actual is None or abs(actual - claimed) > 0.01: return None
return {"problem": problem, "answer": claimed}
def parse_gold(answer_field: str):
m = re.search(r"####\s*(-?\d+(?:,\d+)*(?:\.\d+)?)", answer_field)
return float(m.group(1).replace(",", "")) if m else None
def gsm8k_eval(model, tok, n=50):
ds = list(load_dataset("openai/gsm8k", "main", split="test"))[:n]
log(f" eval on GSM8K-test ({len(ds)} problems)")
prompts = [SOLVE_PROMPT.format(problem=p["question"]) for p in ds]
outs = gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=8)
correct = 0
for p, raw in zip(ds, outs):
gold = parse_gold(p["answer"])
if gold is None: continue
pred = extract_answer(raw)
if pred is not None and abs(pred - gold) < 0.01: correct += 1
return correct, len(ds)
def make_train_example(problem: str, solution: str, tok):
user = SOLVE_PROMPT.format(problem=problem)
msgs_pre = [{"role": "system", "content": "You are a careful math tutor."},
{"role": "user", "content": user}]
msgs_full = msgs_pre + [{"role": "assistant", "content": solution}]
pre = tok.apply_chat_template(msgs_pre, tokenize=False, add_generation_prompt=True)
full = tok.apply_chat_template(msgs_full, tokenize=False)
pre_ids = tok(pre, add_special_tokens=False)["input_ids"]
full_ids = tok(full, add_special_tokens=False)["input_ids"]
MAX = 1024
full_ids = full_ids[:MAX]
labels = list(full_ids)
n_pre = min(len(pre_ids), len(labels))
for i in range(n_pre): labels[i] = -100
pad = MAX - len(full_ids)
return {"input_ids": full_ids + [tok.pad_token_id]*pad,
"attention_mask": [1]*len(full_ids) + [0]*pad,
"labels": labels + [-100]*pad}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", default="Qwen/Qwen2.5-3B")
ap.add_argument("--iterations", type=int, default=8)
ap.add_argument("--problems_per_iter", type=int, default=8)
ap.add_argument("--train_every", type=int, default=4)
ap.add_argument("--n_eval", type=int, default=50)
ap.add_argument("--seed", type=int, default=42)
ap.add_argument("--tag", required=True)
args = ap.parse_args()
random.seed(args.seed); torch.manual_seed(args.seed)
out_dir = f"/workspace/curriculum/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
log(f"loading {args.model}")
tok = AutoTokenizer.from_pretrained(args.model)
if tok.pad_token is None: tok.pad_token = tok.eos_token
tok.padding_side = "left"
model = AutoModelForCausalLM.from_pretrained(args.model, dtype=torch.bfloat16, device_map="cuda:0")
log(f" loaded mem={torch.cuda.memory_allocated('cuda:0')/1e9:.1f}GB")
model.eval()
log("INITIAL eval on GSM8K-test")
base_correct, base_total = gsm8k_eval(model, tok, n=args.n_eval)
log(f" GSM8K-test base: {base_correct}/{base_total}")
lora_cfg = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], task_type="CAUSAL_LM")
model = get_peft_model(model, lora_cfg)
accumulated_pairs = [] # at-edge (problem, correct_solution)
problem_pool = [] # current pool of problems for next iter
for it in range(1, args.iterations + 1):
it_t = time.time()
# 1. Generate problems if pool is empty (seed)
if not problem_pool or it == 1:
gen_prompts = [GEN_PROMPT_SEED for _ in range(args.problems_per_iter)]
raw = gen_batch(model, tok, gen_prompts, max_new=300, temperature=0.9)
seeded = [parse_problem(r) for r in raw]
seeded = [s for s in seeded if s]
problem_pool.extend(seeded)
log(f"iter {it}: seeded {len(seeded)} fresh problems (pool={len(problem_pool)})")
# 2. Pick K problems to attempt
random.shuffle(problem_pool)
attempt_problems = problem_pool[:args.problems_per_iter]
problem_pool = problem_pool[args.problems_per_iter:] # consume
if not attempt_problems:
log(f"iter {it}: empty pool, regenerating"); continue
# 3. Greedy solve to assess difficulty
greedy_prompts = [SOLVE_PROMPT.format(problem=p["problem"]) for p in attempt_problems]
greedy_outs = gen_batch(model, tok, greedy_prompts, max_new=300, temperature=0.0)
greedy_correct = []
for p, raw in zip(attempt_problems, greedy_outs):
pred = extract_answer(raw)
ok = pred is not None and abs(pred - p["answer"]) < 0.01
greedy_correct.append(ok)
n_easy = sum(greedy_correct)
log(f"iter {it}: {n_easy}/{len(attempt_problems)} solved greedily")
new_pairs = 0
amplify_targets = []
simplify_targets = []
for p, easy in zip(attempt_problems, greedy_correct):
if easy:
# too easy → amplify next round
amplify_targets.append(p)
else:
# try sampled attempts to find at-edge
solve_prompts = [SOLVE_PROMPT.format(problem=p["problem"])] * 4
atts = gen_batch(model, tok, solve_prompts, max_new=300, temperature=0.8)
ok_atts = []
for raw in atts:
pred = extract_answer(raw)
if pred is not None and abs(pred - p["answer"]) < 0.01:
ok_atts.append(raw.strip())
if ok_atts:
# at-edge → mine pair
accumulated_pairs.append({"problem": p["problem"], "solution": ok_atts[0],
"answer": p["answer"]})
new_pairs += 1
else:
# too hard → simplify
simplify_targets.append(p)
log(f"iter {it}: +{new_pairs} pairs (total: {len(accumulated_pairs)}). "
f"amplify={len(amplify_targets)}, simplify={len(simplify_targets)}")
# 4. Generate amplified/simplified versions for next iter
if amplify_targets:
amp_prompts = [AMPLIFY_PROMPT.format(problem=p["problem"], answer=p["answer"]) for p in amplify_targets[:args.problems_per_iter]]
amp_outs = gen_batch(model, tok, amp_prompts, max_new=300, temperature=0.7)
for raw in amp_outs:
np = parse_problem(raw)
if np: problem_pool.append(np)
if simplify_targets:
sim_prompts = [SIMPLIFY_PROMPT.format(problem=p["problem"], answer=p["answer"]) for p in simplify_targets[:args.problems_per_iter // 2]]
sim_outs = gen_batch(model, tok, sim_prompts, max_new=300, temperature=0.7)
for raw in sim_outs:
np = parse_problem(raw)
if np: problem_pool.append(np)
with open(f"{out_dir}/pairs.jsonl", "w") as fh:
for r in accumulated_pairs: fh.write(json.dumps(r) + "\n")
log(f"iter {it} done [{time.time()-it_t:.0f}s]; pool size now {len(problem_pool)}")
# 5. Train every N
if it % args.train_every == 0 and len(accumulated_pairs) >= 5:
log(f" TRAINING on {len(accumulated_pairs)} pairs")
tok.padding_side = "right"
ds = HFDataset.from_list([make_train_example(r["problem"], r["solution"], tok) for r in accumulated_pairs])
targs = TrainingArguments(
output_dir=f"{out_dir}/ckpt", num_train_epochs=2,
per_device_train_batch_size=1, gradient_accumulation_steps=4,
learning_rate=1e-4, bf16=True, logging_steps=10,
save_strategy="no", report_to="none", remove_unused_columns=False, warmup_ratio=0.05,
)
Trainer(model=model, args=targs, train_dataset=ds, processing_class=tok).train()
tok.padding_side = "left"
model.eval()
corr, tot = gsm8k_eval(model, tok, n=args.n_eval)
log(f" GSM8K-test @ iter {it}: {corr}/{tot}")
model.train()
# Final eval
model.eval()
final_correct, final_total = gsm8k_eval(model, tok, n=args.n_eval)
result = {
"model": args.model, "iterations": args.iterations,
"n_pairs": len(accumulated_pairs),
"base": [base_correct, base_total],
"trained": [final_correct, final_total],
"delta": final_correct - base_correct,
"elapsed_s": time.time() - T0,
}
with open(f"{out_dir}/result.json", "w") as fh:
json.dump(result, fh, indent=2)
print()
print("=" * 70)
print(f" CURRICULUM TINYFORGE-ZERO-MATH — {args.model}")
print(f" Self-mined pairs: {len(accumulated_pairs)}")
print(f" GSM8K-test: base={base_correct}/{base_total} trained={final_correct}/{final_total} Δ={final_correct-base_correct:+d}")
print(f" Time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

115
recipe/eval_plus.py Normal file
View file

@ -0,0 +1,115 @@
"""Eval our best 14B adapter on HumanEval+ (contamination-resistant hidden tests)."""
import os, json, time, re, subprocess, tempfile, argparse
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset
from peft import PeftModel
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def extract_code(text):
if "```python" in text: text = text.split("```python", 1)[1]
elif "```" in text: text = text.split("```", 1)[1]
if "```" in text: text = text.split("```", 1)[0]
return text.strip()
def run_python(code, timeout=15):
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
f.write(code); path = f.name
try:
r = subprocess.run(["python3", path], capture_output=True, timeout=timeout, text=True, cwd="/tmp")
return r.returncode == 0
except subprocess.TimeoutExpired: return False
finally:
try: os.unlink(path)
except: pass
def gen_batch(model, tok, prompts, max_new=400, batch=4):
outs = []
for i in range(0, len(prompts), batch):
chunk = prompts[i:i+batch]
texts = []
for p in chunk:
msgs = [{"role": "system", "content": "You are a Python coder. Output one ```python block only."},
{"role": "user", "content": p}]
texts.append(tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True))
inp = tok(texts, return_tensors="pt", padding=True, truncation=True, max_length=1500).to(model.device)
with torch.no_grad():
out = model.generate(**inp, max_new_tokens=max_new, do_sample=False, pad_token_id=tok.eos_token_id)
for j in range(out.size(0)):
outs.append(tok.decode(out[j][inp.input_ids.shape[1]:], skip_special_tokens=True))
return outs
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", default="Qwen/Qwen2.5-14B")
ap.add_argument("--adapter", default="/workspace/multi_pair/multi_v1/adapter")
ap.add_argument("--tag", required=True)
args = ap.parse_args()
out_dir = f"/workspace/eval_plus/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
log(f"loading {args.model}")
tok = AutoTokenizer.from_pretrained(args.model)
if tok.pad_token is None: tok.pad_token = tok.eos_token
tok.padding_side = "left"
model = AutoModelForCausalLM.from_pretrained(args.model, torch_dtype=torch.bfloat16, device_map="cuda:0")
if args.adapter and os.path.exists(args.adapter):
log(f" loading adapter from {args.adapter}")
model = PeftModel.from_pretrained(model, args.adapter)
else:
log(" no adapter — base only")
model.eval()
# Load HumanEval+ via evalplus dataset
log("loading HumanEvalPlus dataset")
ds = list(load_dataset("evalplus/humanevalplus", split="test"))
log(f" {len(ds)} problems")
# Eval
log("eval...")
prompts = [p["prompt"] + "\n# Complete the function above." for p in ds]
outs = gen_batch(model, tok, prompts, max_new=400, batch=4)
base_pass, plus_pass = 0, 0
for i, (p, raw) in enumerate(zip(ds, outs)):
code = extract_code(raw) if "```" in raw else raw
full = p["prompt"] + "\n" + code if "def " not in code else code
# Public tests
base_test = full + "\n\n" + p["test"] + f"\n\ncheck({p['entry_point']})"
b = run_python(base_test, timeout=15)
# Plus tests (hidden harder)
plus_check = p.get("plus_input", None)
if plus_check is not None and "plus_test" in p:
plus_test = full + "\n\n" + p["plus_test"] + f"\n\ncheck({p['entry_point']})"
pp = run_python(plus_test, timeout=15)
else:
pp = b # fallback
if b: base_pass += 1
if pp: plus_pass += 1
if (i+1) % 20 == 0:
log(f" {i+1}/{len(ds)}: base={base_pass}, plus={plus_pass}")
result = {"model": args.model, "adapter": args.adapter,
"base_pass": base_pass, "plus_pass": plus_pass, "n": len(ds),
"elapsed_s": time.time() - T0}
with open(f"{out_dir}/result.json", "w") as fh: json.dump(result, fh, indent=2)
print()
print("=" * 70)
print(f" HumanEval+ public: {base_pass}/{len(ds)} plus(hidden): {plus_pass}/{len(ds)}")
print(f" Time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

216
recipe/eval_raw.py Normal file
View file

@ -0,0 +1,216 @@
"""vLLM dual eval using RAW completion format (no chat template) for base models.
Recipe for non-instruct base models uses simple completion-style prompting
that matches how base models were pretrained.
"""
import os, json, time, re, subprocess, tempfile, argparse, gc
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
import torch
from datasets import load_dataset
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def extract_code(text):
if "```python" in text: text = text.split("```python", 1)[1]
elif "```" in text: text = text.split("```", 1)[1]
if "```" in text: text = text.split("```", 1)[0]
return text.strip()
def run_python(code, timeout=10):
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
f.write(code); path = f.name
try:
r = subprocess.run(["python3", path], capture_output=True, timeout=timeout, text=True, cwd="/tmp")
return r.returncode == 0
except subprocess.TimeoutExpired: return False
finally:
try: os.unlink(path)
except: pass
def make_he_prompt(p):
"""Raw completion: just the docstring + 'def'."""
return p["prompt"]
def make_mbpp_prompt(p):
"""Raw completion: docstring + tests + 'def'."""
return (f"# Task: {p['prompt']}\n"
f"# Tests:\n# " + "\n# ".join(p["test_list"]) + "\n\n")
def vllm_generate(llm, prompts, max_new=400, temperature=0.0, stops=None):
from vllm import SamplingParams
sp = SamplingParams(
temperature=temperature, top_p=0.95 if temperature > 0 else 1.0,
max_tokens=max_new, stop=stops or ["\nclass ", "\nif __name__", "\nprint(", "\n#"],
)
out = llm.generate(prompts, sp, use_tqdm=False)
return [o.outputs[0].text for o in out]
def vllm_generate_lora(llm, prompts, lora_req, max_new=400, temperature=0.0, stops=None):
from vllm import SamplingParams
sp = SamplingParams(
temperature=temperature, top_p=0.95 if temperature > 0 else 1.0,
max_tokens=max_new, stop=stops or ["\nclass ", "\nif __name__", "\nprint(", "\n#"],
)
out = llm.generate(prompts, sp, lora_request=lora_req, use_tqdm=False)
return [o.outputs[0].text for o in out]
def eval_humaneval(outs_func, label):
he = list(load_dataset("openai_humaneval", split="test"))
log(f" HumanEval [{label}] ({len(he)})")
prompts = [make_he_prompt(p) for p in he]
t0 = time.time()
outs = outs_func(prompts, max_new=400)
log(f" gen done in {time.time()-t0:.1f}s")
correct = 0
for p, raw in zip(he, outs):
# construct full function: prompt + raw completion
full = p["prompt"] + raw
test_code = full + "\n\n" + p["test"] + f"\n\ncheck({p['entry_point']})"
if run_python(test_code, timeout=10): correct += 1
return correct, len(he)
def eval_mbpp(outs_func, label, n=200):
mbpp = list(load_dataset("mbpp", "sanitized", split="test"))[:n]
log(f" MBPP [{label}] ({len(mbpp)})")
prompts = [make_mbpp_prompt(p) for p in mbpp]
t0 = time.time()
outs = outs_func(prompts, max_new=400)
log(f" gen done in {time.time()-t0:.1f}s")
correct = 0
for p, raw in zip(mbpp, outs):
# raw is the function code
code = raw
if "```" in code:
code = extract_code("```python" + code if "```python" not in code else code)
test_code = code + "\n\n" + "\n".join(p["test_list"])
if run_python(test_code, timeout=10): correct += 1
return correct, len(mbpp)
def make_train_example(r, tok):
"""Raw-completion training format."""
sig = r.get("signature", "")
broken = r.get("broken", "")
fixed = r.get("fixed", "")
tests = r.get("tests", [])
err = r.get("error", "")
user = (f"# Task: implement {sig}\n"
f"# Tests:\n# " + "\n# ".join(tests) + "\n"
f"# My broken attempt:\n{broken}\n"
f"# Error: {err}\n"
f"# Corrected:\n")
target = fixed
full = user + target
full_ids = tok(full, add_special_tokens=False)["input_ids"]
user_ids = tok(user, add_special_tokens=False)["input_ids"]
MAX = 1024
full_ids = full_ids[:MAX]
labels = list(full_ids)
n_user = min(len(user_ids), len(labels))
for i in range(n_user): labels[i] = -100
pad = MAX - len(full_ids)
return {"input_ids": full_ids + [tok.pad_token_id]*pad,
"attention_mask": [1]*len(full_ids) + [0]*pad,
"labels": labels + [-100]*pad}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", required=True)
ap.add_argument("--pairs", default="/workspace/saved_pairs/pairs_40.jsonl")
ap.add_argument("--n_pairs", type=int, default=40)
ap.add_argument("--mbpp_n", type=int, default=200)
ap.add_argument("--tag", required=True)
ap.add_argument("--skip_train", action="store_true")
args = ap.parse_args()
out_dir = f"/workspace/dual_eval_raw/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
from vllm import LLM
from transformers import AutoTokenizer
log(f"loading {args.model} into vLLM")
tok = AutoTokenizer.from_pretrained(args.model)
if tok.pad_token is None: tok.pad_token = tok.eos_token
llm = LLM(model=args.model, dtype="bfloat16", gpu_memory_utilization=0.85, max_model_len=2048)
log(f" loaded")
log("=== BASE evals ===")
base_he, _ = eval_humaneval(lambda P, max_new=400: vllm_generate(llm, P, max_new=max_new), "BASE")
base_mbpp, _ = eval_mbpp(lambda P, max_new=400: vllm_generate(llm, P, max_new=max_new), "BASE", n=args.mbpp_n)
log(f" BASE: HumanEval={base_he}/164 MBPP={base_mbpp}/{args.mbpp_n}")
if args.skip_train:
result = {"model": args.model, "base_humaneval": base_he, "base_mbpp": base_mbpp, "n_he": 164, "n_mbpp": args.mbpp_n, "elapsed_s": time.time()-T0}
with open(f"{out_dir}/result.json", "w") as fh: json.dump(result, fh, indent=2)
return
# Tear down vLLM, train LoRA
log("=== TRAINING ===")
del llm; gc.collect(); torch.cuda.empty_cache()
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer
from datasets import Dataset as HFDataset
from peft import LoraConfig, get_peft_model
pairs = [json.loads(l) for l in open(args.pairs)][:args.n_pairs]
model = AutoModelForCausalLM.from_pretrained(args.model, torch_dtype=torch.bfloat16, device_map="cuda:0")
lora_cfg = LoraConfig(r=16, lora_alpha=32, lora_dropout=0.05, bias="none",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], task_type="CAUSAL_LM")
model = get_peft_model(model, lora_cfg)
ds = HFDataset.from_list([make_train_example(r, tok) for r in pairs])
targs = TrainingArguments(
output_dir=f"{out_dir}/ckpt", num_train_epochs=2,
per_device_train_batch_size=1, gradient_accumulation_steps=4,
learning_rate=1e-4, bf16=True, logging_steps=10,
save_strategy="no", report_to="none", remove_unused_columns=False, warmup_ratio=0.05,
)
Trainer(model=model, args=targs, train_dataset=ds, tokenizer=tok).train()
log("training done")
adapter_dir = f"{out_dir}/adapter"
model.save_pretrained(adapter_dir)
del model; gc.collect(); torch.cuda.empty_cache()
from vllm import LLM
from vllm.lora.request import LoRARequest
llm = LLM(model=args.model, dtype="bfloat16", gpu_memory_utilization=0.85, max_model_len=2048,
enable_lora=True, max_lora_rank=16)
lora_req = LoRARequest("tf_adapter", 1, adapter_dir)
log("=== TRAINED evals (vLLM + LoRA) ===")
tr_he, _ = eval_humaneval(lambda P, max_new=400: vllm_generate_lora(llm, P, lora_req, max_new=max_new), "TRAINED")
tr_mbpp, _ = eval_mbpp(lambda P, max_new=400: vllm_generate_lora(llm, P, lora_req, max_new=max_new), "TRAINED", n=args.mbpp_n)
result = {
"model": args.model, "n_pairs": len(pairs),
"humaneval": {"base": base_he, "trained": tr_he, "delta": tr_he-base_he, "n": 164},
"mbpp": {"base": base_mbpp, "trained": tr_mbpp, "delta": tr_mbpp-base_mbpp, "n": args.mbpp_n},
"elapsed_s": time.time() - T0,
}
with open(f"{out_dir}/result.json", "w") as fh: json.dump(result, fh, indent=2)
print()
print("=" * 70)
print(f" {args.model} — RAW completion format")
print(f" HumanEval: base={base_he}/164 trained={tr_he}/164 Δ={tr_he-base_he:+d}")
print(f" MBPP: base={base_mbpp}/{args.mbpp_n} trained={tr_mbpp}/{args.mbpp_n} Δ={tr_mbpp-base_mbpp:+d}")
print(f" Time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

328
recipe/multi_pair_14b.py Normal file
View file

@ -0,0 +1,328 @@
"""Aggressive multi-pair mining on Qwen2.5-14B-Base.
Differences from warmup recipe:
- Harder problem-generation prompt (edge cases, multi-step, tricky boundaries)
- 200 problems generated (vs 80)
- 8 sampled attempts per problem at temp 0.8 (vs 4)
- Mine ALL (broken, fixed) pairs per problem, not just 1
- Deduplicate near-identical broken code (Jaccard < 0.85)
- Larger LoRA: rank 32 attn-only
- Train fresh from base on combined (warmup_40 + new) pairs
"""
import os, sys, json, time, re, gc, subprocess, tempfile, argparse, random, hashlib
os.environ.setdefault("HF_HOME", "/workspace/hf")
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer
from datasets import load_dataset, Dataset as HFDataset
from peft import LoraConfig, get_peft_model
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
def extract_code(text):
if "```python" in text: text = text.split("```python", 1)[1]
elif "```" in text: text = text.split("```", 1)[1]
if "```" in text: text = text.split("```", 1)[0]
return text.strip()
def run_python(code, timeout=10):
with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
f.write(code); path = f.name
try:
r = subprocess.run(["python3", path], capture_output=True, timeout=timeout, text=True, cwd="/tmp")
if r.returncode == 0: return True, ""
err = (r.stderr or r.stdout).strip().splitlines()
return False, "\n".join(err[-3:])[:300]
except subprocess.TimeoutExpired: return False, "timeout"
finally:
try: os.unlink(path)
except: pass
def gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=4):
outs = []
for i in range(0, len(prompts), batch):
chunk = prompts[i:i+batch]
texts = []
for p in chunk:
msgs = [{"role": "system", "content": "You are an expert Python coder. Output one ```python block only."},
{"role": "user", "content": p}]
texts.append(tok.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True))
inp = tok(texts, return_tensors="pt", padding=True, truncation=True, max_length=1500).to(model.device)
with torch.no_grad():
out = model.generate(**inp, max_new_tokens=max_new, do_sample=temperature > 0,
temperature=temperature if temperature > 0 else 1.0, top_p=0.95,
pad_token_id=tok.eos_token_id)
for j in range(out.size(0)):
outs.append(tok.decode(out[j][inp.input_ids.shape[1]:], skip_special_tokens=True))
return outs
def humaneval_full(model, tok):
he = list(load_dataset("openai_humaneval", split="test"))
log(f" HumanEval ({len(he)} problems)")
prompts = [p["prompt"] + "\n# Complete the function above." for p in he]
outs = gen_batch(model, tok, prompts, max_new=400, temperature=0.0, batch=4)
correct = 0
for i, (p, raw) in enumerate(zip(he, outs)):
code = extract_code(raw) if "```" in raw else raw
full = p["prompt"] + "\n" + code if "def " not in code else code
test_code = full + "\n\n" + p["test"] + f"\n\ncheck({p['entry_point']})"
ok, _ = run_python(test_code, timeout=10)
if ok: correct += 1
if (i+1) % 30 == 0: log(f" eval {i+1}/{len(he)}: {correct} correct")
return correct, len(he)
HARD_GEN_PROMPT = """Generate ONE challenging Python coding problem that requires:
- non-trivial algorithm (sorting variants, hash maps, two-pointer, dynamic logic, recursive backtracking, parsing, etc.)
- handles edge cases (empty input, negatives, duplicates, boundaries, or unusual inputs)
- 3 test assertions covering normal + edge cases
Output exactly:
```python
def {function_name}({args}):
\"\"\"{problem description}\"\"\"
{implementation}
# tests
assert {function_name}(...) == ...
assert {function_name}(...) == ...
assert {function_name}(...) == ...
```
Output ONLY the code block. Make the problem genuinely tricky."""
def parse_problem(raw):
code = extract_code(raw) if "```" in raw else raw.strip()
if "def " not in code: return None
lines = code.split("\n")
func_start = next((i for i, l in enumerate(lines) if l.startswith("def ")), None)
if func_start is None: return None
tests = []
def_end = None
for i in range(func_start, len(lines)):
l = lines[i]
if l.startswith("def ") and i > func_start: break
if l.startswith("assert "):
tests.append(l)
if def_end is None: def_end = i
if len(tests) < 2: return None
if def_end is None: def_end = len(lines)
full_solution = "\n".join(lines[func_start:def_end]).strip()
if len(full_solution) < 30: return None
m = re.match(r"def\s+(\w+)\s*\(", lines[func_start])
if not m: return None
sig_lines = []
for i in range(func_start, def_end):
sig_lines.append(lines[i])
if i == func_start and not any('"""' in lines[j] for j in range(i, min(i+5, def_end))):
sig_lines.append(" pass"); break
return {"fn_name": m.group(1), "signature": "\n".join(sig_lines), "tests": tests,
"canonical": full_solution}
def code_signature(code):
"""Normalize code for dedup: strip whitespace, lowercase, hash."""
norm = re.sub(r"\s+", " ", code).strip().lower()
return hashlib.md5(norm.encode()).hexdigest()
def jaccard_similar(a, b, threshold=0.85):
"""Quick token-level Jaccard."""
ta = set(re.findall(r"\w+", a.lower()))
tb = set(re.findall(r"\w+", b.lower()))
if not ta or not tb: return False
return len(ta & tb) / len(ta | tb) >= threshold
def mine_aggressive(model, tok, n_problems=200, max_pairs_per_problem=4, n_attempts=8,
batch_gen=4):
"""Generate many problems, mine ALL broken-fixed combinations per problem."""
log(f"AGGRESSIVE MINING — {n_problems} problems, {n_attempts} attempts each, up to {max_pairs_per_problem} pairs/problem")
# Step 1: generate problems in batches
log(" generating problems...")
all_problems = []
for batch_start in range(0, n_problems, batch_gen):
chunk_size = min(batch_gen, n_problems - batch_start)
raws = gen_batch(model, tok, [HARD_GEN_PROMPT]*chunk_size, max_new=500, temperature=0.95, batch=batch_gen)
for r in raws:
p = parse_problem(r)
if p is None: continue
full = p["canonical"] + "\n\n" + "\n".join(p["tests"])
ok, _ = run_python(full)
if ok: all_problems.append(p)
if batch_start % (batch_gen*5) == 0:
log(f" generated {batch_start+chunk_size}/{n_problems}, valid so far: {len(all_problems)}")
log(f"{len(all_problems)} valid problems")
# Step 2: for each problem, sample n_attempts solutions at temp 0.8, classify pass/fail
log(" solving each problem with multiple attempts...")
all_pairs = []
seen_broken_sigs = set()
for pi, p in enumerate(all_problems):
solve_prompt = (f"Implement: {p['signature']}\n\nTests:\n{chr(10).join(p['tests'])}\n\n"
f"Output only the function implementation in one ```python block.")
attempts = gen_batch(model, tok, [solve_prompt]*n_attempts, max_new=500, temperature=0.8, batch=batch_gen)
passes, fails = [], []
for raw in attempts:
code = extract_code(raw) if "```" in raw else raw
ok, err = run_python(code + "\n\n" + "\n".join(p["tests"]))
if ok: passes.append(code)
else: fails.append((code, err))
# Mine pairs: each fail × each pass, capped per problem; dedupe broken
problem_pairs = 0
for (broken, broken_err) in fails:
if problem_pairs >= max_pairs_per_problem: break
sig = code_signature(broken)
if sig in seen_broken_sigs: continue
# check Jaccard against recent broken codes
is_dup = False
for existing in list(seen_broken_sigs)[-50:]:
# can't easily reverse-hash; check against the actual broken strings we've kept
pass
for pass_code in passes:
all_pairs.append({
"signature": p["signature"], "tests": p["tests"],
"broken": broken, "error": broken_err, "fixed": pass_code,
})
seen_broken_sigs.add(sig)
problem_pairs += 1
break # one fixed per broken to keep diversity
if (pi+1) % 10 == 0:
log(f" solved {pi+1}/{len(all_problems)}, pairs mined: {len(all_pairs)}")
log(f" AGGRESSIVE MINING DONE — {len(all_pairs)} pairs from {len(all_problems)} problems")
return all_pairs
def make_example(r, tok):
user = (f"Implement: {r['signature']}\n\n"
f"Tests:\n{chr(10).join(r['tests'])}\n\n"
f"My attempt:\n```python\n{r['broken']}\n```\n\n"
f"Error:\n{r.get('error','')}\n\n"
f"Fix and output the corrected code only.")
assistant = f"```python\n{r['fixed']}\n```"
msgs_pre = [{"role": "system", "content": "You are an expert Python coder. Output one ```python block only."},
{"role": "user", "content": user}]
msgs_full = msgs_pre + [{"role": "assistant", "content": assistant}]
pre = tok.apply_chat_template(msgs_pre, tokenize=False, add_generation_prompt=True)
full = tok.apply_chat_template(msgs_full, tokenize=False)
pre_ids = tok(pre, add_special_tokens=False)["input_ids"]
full_ids = tok(full, add_special_tokens=False)["input_ids"]
MAX = 1024
full_ids = full_ids[:MAX]
labels = list(full_ids)
n_pre = min(len(pre_ids), len(labels))
for i in range(n_pre): labels[i] = -100
pad = MAX - len(full_ids)
return {"input_ids": full_ids + [tok.pad_token_id]*pad,
"attention_mask": [1]*len(full_ids) + [0]*pad,
"labels": labels + [-100]*pad}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", default="Qwen/Qwen2.5-14B")
ap.add_argument("--warmup_pairs_path", default="/workspace/saved_pairs/pairs_40.jsonl")
ap.add_argument("--n_warmup_pairs", type=int, default=40)
ap.add_argument("--n_problems", type=int, default=200)
ap.add_argument("--n_attempts", type=int, default=8)
ap.add_argument("--max_pairs_per_problem", type=int, default=4)
ap.add_argument("--lora_rank", type=int, default=32)
ap.add_argument("--epochs", type=int, default=2)
ap.add_argument("--lr", type=float, default=1e-4)
ap.add_argument("--tag", required=True)
args = ap.parse_args()
out_dir = f"/workspace/multi_pair/{args.tag}"
os.makedirs(out_dir, exist_ok=True)
log(f"loading {args.model}")
tok = AutoTokenizer.from_pretrained(args.model)
if tok.pad_token is None: tok.pad_token = tok.eos_token
tok.padding_side = "left"
model = AutoModelForCausalLM.from_pretrained(args.model, torch_dtype=torch.bfloat16, device_map="cuda:0")
log(f" loaded mem={torch.cuda.memory_allocated('cuda:0')/1e9:.1f}GB")
# Base eval
model.eval()
log("=== BASE eval ===")
base_corr, base_total = humaneval_full(model, tok)
log(f" BASE: {base_corr}/{base_total}")
# Stage 1: aggressive mining from BASE model (not from warmup — we want fresh diversity)
log("=== AGGRESSIVE MINING (from base model) ===")
new_pairs = mine_aggressive(model, tok,
n_problems=args.n_problems,
max_pairs_per_problem=args.max_pairs_per_problem,
n_attempts=args.n_attempts)
with open(f"{out_dir}/pairs_new.jsonl", "w") as fh:
for p in new_pairs: fh.write(json.dumps(p) + "\n")
log(f" saved {len(new_pairs)} new pairs")
# Combine with warmup pairs
warmup_pairs = [json.loads(l) for l in open(args.warmup_pairs_path)][:args.n_warmup_pairs]
combined = warmup_pairs + new_pairs
log(f" combined: {len(warmup_pairs)} warmup + {len(new_pairs)} new = {len(combined)} total")
if len(combined) < 20:
log("FATAL: too few pairs"); return
# Stage 2: train fresh LoRA on combined
log(f"=== TRAINING — fresh LoRA rank={args.lora_rank}, lr={args.lr}, e={args.epochs} ===")
lora_cfg = LoraConfig(r=args.lora_rank, lora_alpha=args.lora_rank*2, lora_dropout=0.05, bias="none",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"], task_type="CAUSAL_LM")
model = get_peft_model(model, lora_cfg)
model.print_trainable_parameters()
tok.padding_side = "right"
ds = HFDataset.from_list([make_example(r, tok) for r in combined])
targs = TrainingArguments(
output_dir=f"{out_dir}/ckpt", num_train_epochs=args.epochs,
per_device_train_batch_size=1, gradient_accumulation_steps=4,
learning_rate=args.lr, bf16=True, logging_steps=20,
save_strategy="no", report_to="none", remove_unused_columns=False, warmup_ratio=0.05,
)
Trainer(model=model, args=targs, train_dataset=ds, processing_class=tok).train()
log(" training done")
tok.padding_side = "left"
# Stage 3: eval
model.eval()
log("=== TRAINED eval ===")
tr_corr, tr_total = humaneval_full(model, tok)
log(f" TRAINED: {tr_corr}/{tr_total} Δ={tr_corr-base_corr:+d}")
model.save_pretrained(f"{out_dir}/adapter")
result = {
"model": args.model, "method": "aggressive multi-pair mining",
"base": [base_corr, base_total], "trained": [tr_corr, tr_total],
"delta": tr_corr - base_corr,
"n_warmup_pairs": len(warmup_pairs), "n_new_pairs": len(new_pairs),
"n_total_pairs": len(combined),
"n_problems_generated": args.n_problems, "n_attempts_per_problem": args.n_attempts,
"max_pairs_per_problem": args.max_pairs_per_problem,
"lora_rank": args.lora_rank, "lr": args.lr, "epochs": args.epochs,
"elapsed_s": time.time() - T0,
}
with open(f"{out_dir}/result.json", "w") as fh: json.dump(result, fh, indent=2)
print()
print("=" * 70)
print(f" MULTI-PAIR on {args.model}")
print(f" HumanEval: base={base_corr}/{base_total} trained={tr_corr}/{tr_total} Δ={tr_corr-base_corr:+d}")
print(f" Total pairs: {len(combined)} ({len(warmup_pairs)} warmup + {len(new_pairs)} new)")
print(f" Time: {time.time()-T0:.0f}s")
print("=" * 70)
if __name__ == "__main__":
main()

146
recipe/train_on_pairs.py Normal file
View file

@ -0,0 +1,146 @@
"""Train a LoRA adapter on a released `pairs.jsonl` file and evaluate.
This is the clean replication entrypoint: skip the mining stage entirely
and just consume the (broken, fixed) pairs we already mined and released
in data/. Use this to reproduce the headline numbers without burning
GPU on the search step.
Schema of pairs.jsonl (one JSON object per line):
{
"signature": "def foo(x): ...", # function header + docstring
"tests": ["assert foo(1) == 2", ...],
"broken": "def foo(x): ... # buggy",
"error": "AssertionError ...",
"fixed": "def foo(x): ... # correct"
}
Example:
python recipe/train_on_pairs.py \\
--model Qwen/Qwen2.5-7B \\
--pairs data/pairs_7b_40.jsonl \\
--out adapter_7b_seed13 \\
--seed 13
Then evaluate the resulting adapter with:
python recipe/eval_raw.py --model Qwen/Qwen2.5-7B \\
--adapter adapter_7b_seed13 --bench humaneval
"""
import argparse, json, os, random, time
from pathlib import Path
import torch
from datasets import Dataset
from peft import LoraConfig, get_peft_model
from transformers import (AutoModelForCausalLM, AutoTokenizer,
Trainer, TrainingArguments)
T0 = time.time()
def log(m): print(f"[{time.time()-T0:7.1f}s] {m}", flush=True)
REPAIR_PROMPT = """### Task
Fix the bug in the Python function so it passes all the provided tests.
### Tests
{tests}
### Buggy code
```python
{broken}
```
### Error
{error}
### Fixed code
```python
{fixed}
```
"""
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--model", required=True,
help="HF model id, e.g. Qwen/Qwen2.5-7B")
ap.add_argument("--pairs", required=True,
help="Path to a pairs.jsonl file (one JSON object per line)")
ap.add_argument("--out", required=True,
help="Output directory for the trained LoRA adapter")
ap.add_argument("--epochs", type=int, default=2)
ap.add_argument("--lr", type=float, default=1e-4)
ap.add_argument("--lora-rank", type=int, default=16)
ap.add_argument("--seed", type=int, default=13)
ap.add_argument("--batch-size", type=int, default=1)
ap.add_argument("--grad-accum", type=int, default=8)
ap.add_argument("--max-length", type=int, default=2048)
args = ap.parse_args()
random.seed(args.seed)
torch.manual_seed(args.seed)
log(f"Loading pairs from {args.pairs}")
pairs = [json.loads(l) for l in open(args.pairs)]
log(f" {len(pairs)} pairs")
log(f"Loading tokenizer + base model {args.model}")
tok = AutoTokenizer.from_pretrained(args.model, use_fast=True)
if tok.pad_token is None:
tok.pad_token = tok.eos_token
model = AutoModelForCausalLM.from_pretrained(
args.model, torch_dtype=torch.bfloat16, device_map="auto",
)
log(f"Attaching LoRA (rank {args.lora_rank}, q/k/v/o projections)")
lora = LoraConfig(
r=args.lora_rank, lora_alpha=args.lora_rank * 2,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],
lora_dropout=0.05, bias="none", task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora)
model.print_trainable_parameters()
def format_pair(p):
tests = "\n".join(p["tests"])
text = REPAIR_PROMPT.format(
tests=tests, broken=p["broken"],
error=p.get("error", ""), fixed=p["fixed"],
)
ids = tok(text, truncation=True, max_length=args.max_length,
padding="max_length", return_tensors="pt")
return {
"input_ids": ids.input_ids[0],
"attention_mask": ids.attention_mask[0],
"labels": ids.input_ids[0].clone(),
}
ds = Dataset.from_list([format_pair(p) for p in pairs])
log("Training")
targs = TrainingArguments(
output_dir=args.out + "_ckpt",
per_device_train_batch_size=args.batch_size,
gradient_accumulation_steps=args.grad_accum,
num_train_epochs=args.epochs,
learning_rate=args.lr,
lr_scheduler_type="cosine",
warmup_ratio=0.03,
logging_steps=5,
save_strategy="no",
bf16=True,
report_to="none",
seed=args.seed,
)
Trainer(model=model, args=targs, train_dataset=ds).train()
log(f"Saving adapter to {args.out}")
Path(args.out).mkdir(parents=True, exist_ok=True)
model.save_pretrained(args.out)
tok.save_pretrained(args.out)
log("Done. Evaluate with: python recipe/eval_raw.py --model "
f"{args.model} --adapter {args.out} --bench humaneval")
if __name__ == "__main__":
main()