Release v2.1.23 Receipt Lock hardening

Hardens Sanhedrin Receipt Lock for model-agnostic use, adds fail-open telemetry and receipt docs, fixes smart_ingest batch safety, wires opt-in CUDA Qwen3 device selection, and refreshes dashboard/release assets.

Fixes #58

Fixes #60
This commit is contained in:
Sam Valladares 2026-05-27 18:22:53 -05:00
parent a8550410b0
commit 5edb163157
161 changed files with 1775 additions and 262 deletions

View file

@ -23,6 +23,7 @@ import re
import sys
import unicodedata
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import asdict, dataclass, field, replace
from pathlib import Path
@ -50,7 +51,7 @@ VESTIGE_BASE_URL = (
SANHEDRIN_ENDPOINT = (
os.environ.get("VESTIGE_SANHEDRIN_ENDPOINT")
or os.environ.get("MLX_ENDPOINT")
or "http://127.0.0.1:8080/v1/chat/completions"
or ""
)
VESTIGE_ENDPOINT = (
os.environ.get("VESTIGE_DEEP_REFERENCE_ENDPOINT")
@ -62,17 +63,27 @@ VESTIGE_HEALTH = (
MODEL = (
os.environ.get("VESTIGE_SANHEDRIN_MODEL")
or os.environ.get("VESTIGE_SANDWICH_MODEL")
or "mlx-community/Qwen3.6-35B-A3B-4bit"
or ""
)
SANHEDRIN_TIMEOUT = env_int("VESTIGE_SANHEDRIN_TIMEOUT", env_int("MLX_TIMEOUT", 45))
VESTIGE_TIMEOUT = env_int("VESTIGE_TIMEOUT", 5)
THINK_RE = re.compile(r"<think>.*?</think>", re.DOTALL | re.IGNORECASE)
SANHEDRIN_BACKEND = (os.environ.get("VESTIGE_SANHEDRIN_BACKEND") or "").strip().lower()
THINK_RE = re.compile(
r"<(?:think|thinking|reasoning)>.*?</(?:think|thinking|reasoning)>",
re.DOTALL | re.IGNORECASE,
)
def post_json(url: str, body: dict, timeout: int):
if not url:
return None
data = json.dumps(body).encode("utf-8")
headers = {"Content-Type": "application/json"}
api_key = os.environ.get("VESTIGE_SANHEDRIN_API_KEY")
if api_key and same_endpoint_origin(url, SANHEDRIN_ENDPOINT):
headers["Authorization"] = f"Bearer {api_key}"
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}
url, data=data, headers=headers
)
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
@ -81,6 +92,53 @@ def post_json(url: str, body: dict, timeout: int):
return None
def same_endpoint_origin(url: str, endpoint: str) -> bool:
try:
target = urllib.parse.urlsplit(url)
expected = urllib.parse.urlsplit(endpoint)
except ValueError:
return False
return (
target.scheme == expected.scheme
and target.netloc == expected.netloc
and target.path == expected.path
)
def use_backend_extensions() -> bool:
if SANHEDRIN_BACKEND in {"mlx", "vllm"}:
return True
if SANHEDRIN_BACKEND in {"openai", "ollama", "llama.cpp", "llamacpp", "litellm"}:
return False
return MODEL.startswith("mlx-community/")
def sanhedrin_model_configured() -> bool:
return bool(SANHEDRIN_ENDPOINT and MODEL)
def sanhedrin_body(
messages: list[dict[str, str]],
max_tokens: int,
stop: list[str] | None = None,
) -> dict[str, Any]:
body: dict[str, Any] = {
"model": MODEL,
"messages": messages,
"max_tokens": max_tokens,
"temperature": 0.0,
"top_p": 1.0,
"stream": False,
}
if stop:
body["stop"] = stop
if use_backend_extensions():
body["top_k"] = 1
body["seed"] = 42
body["chat_template_kwargs"] = {"enable_thinking": False}
return body
TRUST_FLOOR = 0.55 # filter out low-trust memories that drive false-positive vetoes
CLAIM_MODE_ENV = "VESTIGE_SANHEDRIN_CLAIM_MODE"
@ -305,11 +363,15 @@ def fetch_evidence(draft: str) -> tuple[str, int]:
with urllib.request.urlopen(VESTIGE_HEALTH, timeout=VESTIGE_TIMEOUT) as r:
r.read()
except Exception:
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("vestige_health_unavailable", VESTIGE_HEALTH)
return "", 0
query = draft[:1500]
resp = post_json(VESTIGE_ENDPOINT, {"query": query, "depth": 12}, VESTIGE_TIMEOUT)
if not isinstance(resp, dict):
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("deep_reference_unavailable", VESTIGE_ENDPOINT)
return "", 0
parts = []
@ -361,15 +423,21 @@ def fetch_evidence(draft: str) -> tuple[str, int]:
def split_candidate_claims(draft: str) -> list[str]:
"""Return sentence-ish draft fragments that can be classified as claims."""
without_fences = re.sub(r"```.*?```", " ", draft, flags=re.DOTALL)
without_fences = re.sub(r"```.*?```", " ", draft[:16_384], flags=re.DOTALL)
without_fences = re.sub(r"`[^`\n]+`", " ", without_fences)
without_fences = re.sub(r'(^|[\s([{])"[^"\n]+"(?=([\s.,;:!?)}\]]|$))', r"\1 ", without_fences)
fragments: list[str] = []
for line in without_fences.splitlines():
if line.lstrip().startswith(">"):
continue
line = re.sub(r"^\s*[-*+]\s+", "", line).strip()
line = re.sub(r"^\s*\d+[.)]\s+", "", line).strip()
if not line:
continue
parts = re.split(r"(?<=[.!?])\s+(?=[A-Z0-9`\"'])", line)
fragments.extend(part.strip(" \t-") for part in parts if part.strip(" \t-"))
if len(fragments) >= 512:
return fragments[:512]
if not fragments:
compact = " ".join(without_fences.split())
fragments = [
@ -377,13 +445,22 @@ def split_candidate_claims(draft: str) -> list[str]:
for part in re.split(r"(?<=[.!?])\s+(?=[A-Z0-9`\"'])", compact)
if part.strip()
]
return fragments
return fragments[:512]
def normalize_asserted_fragment(text: str) -> str | None:
text = " ".join(text.split()).strip()
if not text:
return None
if re.search(r"\b(need|still need|let me|will|should)\s+to\s+verify\b", text, re.I):
return None
if re.fullmatch(
r"(the user|user|sam|you)\s+(said|told|asked|wrote|noted|mentioned)\s*"
r"(earlier|before|previously)?\.?",
text,
re.I,
):
return None
text = TRAILING_MODAL_COMMENT_RE.sub("", text).strip(" ,;:-")
if HYPOTHETICAL_PREFIX_RE.search(text) or SUBJECT_MODAL_PREFIX_RE.search(text):
return None
@ -473,9 +550,10 @@ def extract_check_worthy_claims(
sam_critical=is_sam_critical_claim(text, claim_class),
)
)
if len(claims) >= max_claims:
break
return claims
return sorted(
claims,
key=lambda claim: (SEVERITY_ORDER.get(claim.claim_class, 99), claim.source_index),
)[:max_claims]
def normalize_evidence_item(raw: Any, source: str = "vestige") -> EvidenceItem | None:
@ -823,29 +901,29 @@ def validate_verdict(verdict: str) -> str:
def judge(draft: str, evidence: str) -> str:
if not sanhedrin_model_configured():
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open(
"model_not_configured",
"Set VESTIGE_SANHEDRIN_ENDPOINT and VESTIGE_SANHEDRIN_MODEL, or choose a preset.",
)
return ""
user_msg = (
f"VESTIGE EVIDENCE (recommended + top trust-scored memories):\n"
f"{evidence if evidence else '(no relevant evidence retrieved)'}\n\n"
f"---\nDRAFT TO JUDGE:\n{draft}"
)
body = {
"model": MODEL,
"messages": [
body = sanhedrin_body(
[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
"max_tokens": 2500,
"temperature": 0.0,
"top_p": 1.0,
"top_k": 1,
"seed": 42,
"stream": False,
"chat_template_kwargs": {"enable_thinking": False},
"stop": [
2500,
[
"\n\nWait,", "\n\nActually,", "\n\nLet me", "\n\nHmm,",
"\n\nOn second thought", "\n\nOh wait",
],
}
)
resp = post_json(SANHEDRIN_ENDPOINT, body, SANHEDRIN_TIMEOUT)
if not isinstance(resp, dict):
return ""
@ -970,6 +1048,8 @@ def validate_structured_verdict(
return nei_verdict(claim, "Durable evidence exists; absence veto does not apply.", evidence)
if status == "REFUTED" and not relevant_durable_high_trust(claim, evidence):
return nei_verdict(claim, "Durable evidence required for refuted verdict.", evidence)
if status == "SUPPORTED" and high_trust(evidence) and not durable_high_trust(evidence):
return nei_verdict(claim, "Durable evidence required for supported verdict.", evidence)
evidence_ids_raw = data.get("evidence_ids") or []
evidence_ids = [
str(eid) for eid in evidence_ids_raw[:5]
@ -999,6 +1079,13 @@ def validate_structured_verdict(
def judge_claim_with_model(claim: Claim, evidence: list[EvidenceItem]) -> ClaimVerdict:
if not sanhedrin_model_configured():
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open(
"model_not_configured",
"Set VESTIGE_SANHEDRIN_ENDPOINT and VESTIGE_SANHEDRIN_MODEL, or choose a preset.",
)
return nei_verdict(claim, "Sanhedrin model not configured; fail-open for this claim.", evidence)
user_msg = (
f"CLAIM CLASS: {claim.claim_class}\n"
f"SAM-CRITICAL: {'yes' if claim.sam_critical else 'no'}\n"
@ -1008,27 +1095,24 @@ def judge_claim_with_model(claim: Claim, evidence: list[EvidenceItem]) -> ClaimV
f"CLAIM:\n{claim.text}\n\n"
f"EVIDENCE:\n{format_claim_evidence(evidence, claim)}"
)
body = {
"model": MODEL,
"messages": [
body = sanhedrin_body(
[
{"role": "system", "content": CLAIM_SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
"max_tokens": 700,
"temperature": 0.0,
"top_p": 1.0,
"top_k": 1,
"seed": 42,
"stream": False,
"chat_template_kwargs": {"enable_thinking": False},
}
700,
)
resp = post_json(SANHEDRIN_ENDPOINT, body, SANHEDRIN_TIMEOUT)
if not isinstance(resp, dict):
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("model_unavailable", f"endpoint={SANHEDRIN_ENDPOINT}")
return nei_verdict(claim, "Sanhedrin model unavailable; fail-open for this claim.", evidence)
try:
msg = resp["choices"][0]["message"]
raw = msg.get("content") or msg.get("reasoning") or ""
except (KeyError, IndexError, TypeError):
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("malformed_model_response", f"endpoint={SANHEDRIN_ENDPOINT}")
return nei_verdict(claim, "Malformed Sanhedrin model response.", evidence)
data = parse_json_object(raw)
if data is not None:
@ -1036,10 +1120,19 @@ def judge_claim_with_model(claim: Claim, evidence: list[EvidenceItem]) -> ClaimV
legacy = verdict_from_legacy_line(claim, raw, evidence)
if legacy is not None:
return legacy
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("unstructured_model_response", raw[:500])
return nei_verdict(claim, "Sanhedrin model did not return structured JSON.", evidence)
def judge_claim(claim: Claim, evidence: list[EvidenceItem]) -> ClaimVerdict:
if not sanhedrin_model_configured():
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open(
"model_not_configured",
"Set VESTIGE_SANHEDRIN_ENDPOINT and VESTIGE_SANHEDRIN_MODEL, or choose a preset.",
)
return nei_verdict(claim, "Sanhedrin model not configured; fail-open for this claim.", evidence)
durable_count = len(relevant_durable_high_trust(claim, evidence))
high_count = len(high_trust(evidence))
if claim.sam_critical and claim.claim_class in CRITICAL_ABSENCE_CLASSES and durable_count == 0:
@ -1206,6 +1299,8 @@ def claim_mode_result(draft: str) -> dict[str, Any]:
for claim in claims:
evidence, ok = fetch_claim_evidence(claim)
if not ok:
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("retrieval_unavailable", claim.text)
verdicts.append(
nei_verdict(
claim,
@ -1273,6 +1368,8 @@ def main() -> None:
if not verdict:
# Fail-open: server unreachable, malformed response, etc.
if sanhedrin_core is not None:
sanhedrin_core.record_fail_open("legacy_model_unavailable", f"endpoint={SANHEDRIN_ENDPOINT}")
save_legacy_receipt(manifest, "yes", evidence)
print("yes")
return

View file

@ -0,0 +1,103 @@
{
"schema": "vestige.sanhedrin.presets.v2",
"defaultPreset": null,
"description": "Model-agnostic Sanhedrin backend recipes. Presets are suggestions only; users may set any OpenAI-compatible endpoint and model name.",
"presets": {
"custom-openai-compatible": {
"label": "Custom OpenAI-compatible endpoint",
"tier": "custom",
"bestFor": "Any model/server the user already trusts",
"requiresUserModel": true,
"endpointPlaceholder": "http://127.0.0.1:8000/v1/chat/completions",
"modelPlaceholder": "your-model-name",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "",
"VESTIGE_SANHEDRIN_MODEL": "",
"VESTIGE_SANHEDRIN_BACKEND": "openai-compatible",
"VESTIGE_SANHEDRIN_TIMEOUT": "45"
}
},
"small-laptop-ollama": {
"label": "Small laptop Ollama",
"tier": "small-local",
"bestFor": "8-16 GB RAM laptops that need a lightweight offline verifier",
"setup": "Install Ollama, then pull any small instruct model you trust, for example: ollama pull llama3.2:3b or ollama pull qwen2.5:7b",
"tradeoffs": ["fast and accessible", "weaker contradiction judgment than larger models"],
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:11434/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "your-ollama-model",
"VESTIGE_SANHEDRIN_BACKEND": "ollama",
"VESTIGE_SANHEDRIN_TIMEOUT": "60"
}
},
"balanced-local-ollama": {
"label": "Balanced local Ollama",
"tier": "balanced-local",
"bestFor": "16-32 GB RAM machines using 7B-14B local models",
"setup": "Install Ollama and pull a balanced verifier model such as qwen3:14b, llama3.1:8b, or another OpenAI-compatible local model.",
"tradeoffs": ["good first local choice", "model quality depends on the exact model selected"],
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:11434/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "your-ollama-model",
"VESTIGE_SANHEDRIN_BACKEND": "ollama",
"VESTIGE_SANHEDRIN_TIMEOUT": "60"
}
},
"mlx-qwen3.6-apple-silicon": {
"label": "MLX Qwen3.6 35B A3B, Apple Silicon local",
"tier": "strong-local",
"bestFor": "High-memory Apple Silicon users who explicitly choose the strong MLX path",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:8080/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "mlx-community/Qwen3.6-35B-A3B-4bit",
"VESTIGE_SANHEDRIN_BACKEND": "mlx",
"VESTIGE_SANHEDRIN_TIMEOUT": "45"
}
},
"vllm-openai-compatible": {
"label": "vLLM OpenAI-compatible server",
"tier": "workstation",
"bestFor": "GPU workstations and team servers",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:8000/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "your-vllm-model",
"VESTIGE_SANHEDRIN_BACKEND": "vllm",
"VESTIGE_SANHEDRIN_TIMEOUT": "45"
}
},
"llama-cpp-openai-compatible": {
"label": "llama.cpp server",
"tier": "small-local",
"bestFor": "CPU or small-GPU local deployments",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:8081/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "your-gguf-model",
"VESTIGE_SANHEDRIN_BACKEND": "llama.cpp",
"VESTIGE_SANHEDRIN_TIMEOUT": "90"
}
},
"hosted-openai-compatible": {
"label": "Hosted OpenAI-compatible API",
"tier": "hosted",
"bestFor": "Users who want zero local model setup",
"requires": "VESTIGE_SANHEDRIN_API_KEY exported in the hook environment and a model chosen by the user/provider",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "https://api.openai.com/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "your-hosted-model",
"VESTIGE_SANHEDRIN_BACKEND": "openai",
"VESTIGE_SANHEDRIN_TIMEOUT": "45"
}
},
"anthropic-via-litellm": {
"label": "Anthropic through LiteLLM OpenAI-compatible proxy",
"bestFor": "Claude users who already run LiteLLM",
"setup": "Run LiteLLM locally with an Anthropic model, then point Sanhedrin at the proxy.",
"env": {
"VESTIGE_SANHEDRIN_ENDPOINT": "http://127.0.0.1:4000/v1/chat/completions",
"VESTIGE_SANHEDRIN_MODEL": "anthropic/claude-3-5-haiku-latest",
"VESTIGE_SANHEDRIN_BACKEND": "litellm",
"VESTIGE_SANHEDRIN_TIMEOUT": "45"
}
}
}
}

View file

@ -30,7 +30,7 @@ load_vestige_sanhedrin_env() {
command -v python3 >/dev/null 2>&1 || return 0
while IFS="$(printf '\t')" read -r key value; do
case "$key" in
VESTIGE_SANHEDRIN_ENABLED|VESTIGE_SANHEDRIN_MODEL|VESTIGE_SANHEDRIN_ENDPOINT|VESTIGE_SANHEDRIN_CLAIM_MODE|VESTIGE_SANHEDRIN_OUTPUT|VESTIGE_SANHEDRIN_PYTHON|VESTIGE_SANHEDRIN_STATE_DIR|VESTIGE_SANHEDRIN_ALLOW_COMMAND_LEDGER|VESTIGE_DASHBOARD_PORT)
VESTIGE_SANHEDRIN_ENABLED|VESTIGE_SANHEDRIN_MODEL|VESTIGE_SANHEDRIN_ENDPOINT|VESTIGE_SANHEDRIN_API_KEY|VESTIGE_SANHEDRIN_BACKEND|VESTIGE_SANHEDRIN_CLAIM_MODE|VESTIGE_SANHEDRIN_OUTPUT|VESTIGE_SANHEDRIN_PYTHON|VESTIGE_SANHEDRIN_STATE_DIR|VESTIGE_SANHEDRIN_ALLOW_COMMAND_LEDGER|VESTIGE_SANHEDRIN_ALLOW_LOOSE_LEDGER|VESTIGE_DASHBOARD_PORT)
export "$key=$value"
;;
esac
@ -42,11 +42,14 @@ allowed = {
"VESTIGE_SANHEDRIN_ENABLED",
"VESTIGE_SANHEDRIN_MODEL",
"VESTIGE_SANHEDRIN_ENDPOINT",
"VESTIGE_SANHEDRIN_API_KEY",
"VESTIGE_SANHEDRIN_BACKEND",
"VESTIGE_SANHEDRIN_CLAIM_MODE",
"VESTIGE_SANHEDRIN_OUTPUT",
"VESTIGE_SANHEDRIN_PYTHON",
"VESTIGE_SANHEDRIN_STATE_DIR",
"VESTIGE_SANHEDRIN_ALLOW_COMMAND_LEDGER",
"VESTIGE_SANHEDRIN_ALLOW_LOOSE_LEDGER",
"VESTIGE_DASHBOARD_PORT",
}
@ -73,9 +76,9 @@ PY
}
# === OPT-IN GATE ===
# Sanhedrin is heavyweight: the default local backend is a ~19 GB model and
# needs roughly 20+ GB of free RAM. Keep it disabled unless the user explicitly
# opts in. The installer writes this env file only for --enable-sanhedrin.
# Sanhedrin is opt-in and model-agnostic. It never guesses a large verifier
# model; if endpoint/model are unset, the bridge fails open with telemetry.
# The installer writes this env file only for --enable-sanhedrin.
SANHEDRIN_ENV="${VESTIGE_SANHEDRIN_ENV:-$HOME/.claude/hooks/vestige-sanhedrin.env}"
if [ -f "$SANHEDRIN_ENV" ]; then
load_vestige_sanhedrin_env "$SANHEDRIN_ENV" || exit 0
@ -104,6 +107,34 @@ if ! "$PYTHON_BIN" -c 'import sys' >/dev/null 2>&1; then
exit 0
fi
record_sanhedrin_fail_open() {
REASON="$1"
DETAIL="${2:-}"
"$PYTHON_BIN" - "$REASON" "$DETAIL" <<'PY' 2>/dev/null || true
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
reason = sys.argv[1] if len(sys.argv) > 1 else "unknown"
detail = sys.argv[2] if len(sys.argv) > 2 else ""
state_dir = Path(os.environ.get("VESTIGE_SANHEDRIN_STATE_DIR") or Path.home() / ".vestige" / "sanhedrin")
try:
state_dir.mkdir(parents=True, exist_ok=True)
with (state_dir / "fail-open.jsonl").open("a", encoding="utf-8") as f:
f.write(json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"runId": os.environ.get("VESTIGE_SANHEDRIN_RUN_ID"),
"reason": reason,
"detail": detail[:500],
"transcript": os.environ.get("TRANSCRIPT_PATH") or os.environ.get("VESTIGE_SANHEDRIN_TRANSCRIPT"),
}) + "\n")
except OSError:
pass
PY
}
# === READ STOP HOOK INPUT ===
INPUT="$(cat)"
TRANSCRIPT_PATH="$(printf '%s' "$INPUT" | "$PYTHON_BIN" -c 'import sys,json;d=json.load(sys.stdin);print(d.get("transcript_path",""))' 2>/dev/null || printf '')"
@ -203,6 +234,8 @@ fi
OUTPUT_FILE="$(mktemp -t vestige-sanhedrin-out.XXXXXX)"
trap 'rm -f "$DRAFT_SCRIPT" "$OUTPUT_FILE"' EXIT
export VESTIGE_SANHEDRIN_TRANSCRIPT="$TRANSCRIPT_PATH"
export VESTIGE_SANHEDRIN_RUN_ID="${VESTIGE_SANHEDRIN_RUN_ID:-$(date +%s)-$$}"
export VESTIGE_EXECUTIONER_ACTIVE=1
(
printf '%s\n' "$DRAFT" | "$PYTHON_BIN" "$BRIDGE" > "$OUTPUT_FILE" 2>/dev/null
@ -227,6 +260,7 @@ done
if /bin/kill -0 "$EXEC_PID" 2>/dev/null; then
/bin/kill "$EXEC_PID" 2>/dev/null
wait "$EXEC_PID" 2>/dev/null
record_sanhedrin_fail_open "timeout" "sanhedrin-local.py exceeded 60s"
exit 0
fi
wait "$EXEC_PID" 2>/dev/null
@ -344,6 +378,7 @@ fi
TRIMMED="$(printf '%s' "$EXECUTIONER_OUTPUT" | /usr/bin/awk 'NF {print; exit}' | /usr/bin/awk '{$1=$1;print}')"
if [ -z "$TRIMMED" ]; then
record_sanhedrin_fail_open "empty_verdict" "sanhedrin-local.py produced no parseable output"
exit 0
fi
@ -372,4 +407,5 @@ case "$TRIMMED" in
esac
# Unparseable verdict — fail open (do not block on Executioner errors)
record_sanhedrin_fail_open "unparseable_verdict" "$TRIMMED"
exit 0

View file

@ -26,6 +26,8 @@ LATEST_JSON = STATE_DIR / "latest.json"
LATEST_HTML = STATE_DIR / "latest.html"
APPEALS_JSONL = STATE_DIR / "appeals.jsonl"
COMMAND_RECEIPTS_JSONL = STATE_DIR / "command-receipts.jsonl"
FAIL_OPEN_JSONL = STATE_DIR / "fail-open.jsonl"
SUPPORTED_RECEIPT_SCHEMA = "vestige.sanhedrin.receipt.v1"
VERIFICATION_RE = re.compile(
r"\b("
@ -36,6 +38,15 @@ VERIFICATION_RE = re.compile(
r")\b",
re.IGNORECASE,
)
VERIFICATION_HEDGE_LEFT_RE = re.compile(
r"\b("
r"i\s+(think|believe|guess|suspect)|"
r"maybe|might|may\s+have|possibly|probably|apparently|"
r"let\s+me\s+verify|need\s+to\s+verify|will\s+verify|should\s+verify|"
r"not\s+sure|unverified|without\s+running"
r")\b[^.;:!?]{0,80}$",
re.IGNORECASE,
)
COMMAND_FAMILY_PATTERNS = {
"test": re.compile(r"\b(pytest|cargo\s+test|npm\s+(run\s+)?test|pnpm\s+(run\s+)?test|vitest|jest|playwright\s+test)\b", re.I),
"build": re.compile(r"\b(cargo\s+build|npm\s+(run\s+)?build|pnpm\s+(run\s+)?build|next\s+build|vite\s+build)\b", re.I),
@ -70,19 +81,44 @@ def claim_fingerprint(text: str) -> str:
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()[:16]
def strip_non_assertive_regions(text: str) -> str:
"""Remove quoted/code regions before looking for Receipt Lock assertions."""
text = text[:16_384]
text = re.sub(r"```.*?```", " ", text, flags=re.DOTALL)
text = re.sub(r"`[^`\n]+`", " ", text)
kept_lines = []
for line in text.splitlines():
stripped = line.lstrip()
if stripped.startswith(">"):
continue
kept_lines.append(line)
text = "\n".join(kept_lines)
text = re.sub(r'(^|[\s([{])"[^"\n]+"(?=([\s.,;:!?)}\]]|$))', r"\1 ", text)
return text
def is_asserted_verification_claim(text: str) -> bool:
match = VERIFICATION_RE.search(text)
if not match:
return False
left_context = text[max(0, match.start() - 100) : match.start()]
return VERIFICATION_HEDGE_LEFT_RE.search(left_context) is None
def split_claims(draft: str) -> list[str]:
chunks = re.split(r"(?<=[.!?])\s+|\n+", draft)
cleaned = strip_non_assertive_regions(draft)
chunks = re.split(r"(?<=[.!?])\s+|\n+", cleaned)
claims: list[str] = []
for chunk in chunks:
text = chunk.strip(" -\t")
if len(text) >= 18 or VERIFICATION_RE.search(text) or is_hard_user_claim(text):
if len(text) >= 18 or is_asserted_verification_claim(text) or is_hard_user_claim(text):
claims.append(text)
return claims[:24]
def detect_claim_type(text: str) -> str:
low = text.lower()
if VERIFICATION_RE.search(text):
if is_asserted_verification_claim(text):
return "receipt_lock"
if is_hard_user_claim(text):
return "hard_user_claim"
@ -132,7 +168,7 @@ def new_manifest(draft: str) -> dict[str, Any]:
}
)
return {
"schema": "vestige.sanhedrin.receipt.v1",
"schema": SUPPORTED_RECEIPT_SCHEMA,
"id": stable_id(f"{draft_id}:{now_iso()}", "receipt"),
"draftId": draft_id,
"createdAt": now_iso(),
@ -218,6 +254,8 @@ def extract_transcript_receipts(path: Path) -> list[dict[str, Any]]:
except json.JSONDecodeError:
continue
receipts.extend(extract_structured_receipts(obj, pending_commands))
if os.environ.get("VESTIGE_SANHEDRIN_ALLOW_LOOSE_LEDGER") != "1":
continue
blob = json.dumps(obj, ensure_ascii=False)
command = extract_command(blob)
if not command:
@ -551,6 +589,23 @@ def save_manifest(manifest: dict[str, Any]) -> None:
write_text_atomic(LATEST_HTML, rendered)
def record_fail_open(reason: str, detail: str = "", transcript: str | None = None) -> None:
ensure_dirs()
run_id = os.environ.get("VESTIGE_SANHEDRIN_RUN_ID") or stable_id(f"{now_iso()}:{os.getpid()}", "run")
event = {
"timestamp": now_iso(),
"runId": run_id,
"reason": reason,
"detail": detail[:500],
"transcript": transcript or os.environ.get("VESTIGE_SANHEDRIN_TRANSCRIPT"),
}
try:
with FAIL_OPEN_JSONL.open("a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")
except OSError:
pass
def write_text_atomic(path: Path, content: str) -> None:
ensure_dirs()
tmp = path.with_name(f".{path.name}.{os.getpid()}.tmp")