refactor(chat): add streaming/contract/ for file-write contract enforcement

Extracts the desktop_local_folder file-operation contract helpers:

* contract_enforcement_active: gates the contract on filesystem mode.
* evaluate_file_contract_outcome: scores tool outputs as success/no-op.
* log_file_contract: structured logging of contract verdicts.

This is the unit responsible for catching agents that claim to have
written/edited a file without actually invoking the filesystem tool.

Add-only; stream_new_chat.py keeps its inline duplicates until cutover.
This commit is contained in:
CREDO23 2026-05-25 21:48:14 +02:00
parent c13beae1ce
commit 88a58f6aff
2 changed files with 68 additions and 0 deletions

View file

@ -0,0 +1,15 @@
"""File-operation contract evaluation and logging."""
from __future__ import annotations
from app.tasks.chat.streaming.contract.file_contract import (
contract_enforcement_active,
evaluate_file_contract_outcome,
log_file_contract,
)
__all__ = [
"contract_enforcement_active",
"evaluate_file_contract_outcome",
"log_file_contract",
]

View file

@ -0,0 +1,53 @@
"""File-operation contract: when to enforce, how to score, how to log."""
from __future__ import annotations
import json
from typing import Any
from app.tasks.chat.streaming.shared.stream_result import StreamResult
from app.utils.perf import get_perf_logger
_perf_log = get_perf_logger()
def contract_enforcement_active(result: StreamResult) -> bool:
# Enforce only in desktop local-folder mode. Kept deterministic, no
# env-driven progression modes.
return result.filesystem_mode == "desktop_local_folder"
def evaluate_file_contract_outcome(result: StreamResult) -> tuple[bool, str]:
if result.intent_detected != "file_write":
return True, ""
if not result.write_attempted:
return False, "no_write_attempt"
if not result.write_succeeded:
return False, "write_failed"
if not result.verification_succeeded:
return False, "verification_failed"
return True, ""
def log_file_contract(stage: str, result: StreamResult, **extra: Any) -> None:
payload: dict[str, Any] = {
"stage": stage,
"request_id": result.request_id or "unknown",
"turn_id": result.turn_id or "unknown",
"chat_id": (
result.turn_id.split(":", 1)[0] if ":" in result.turn_id else "unknown"
),
"filesystem_mode": result.filesystem_mode,
"client_platform": result.client_platform,
"intent_detected": result.intent_detected,
"intent_confidence": result.intent_confidence,
"write_attempted": result.write_attempted,
"write_succeeded": result.write_succeeded,
"verification_succeeded": result.verification_succeeded,
"commit_gate_passed": result.commit_gate_passed,
"commit_gate_reason": result.commit_gate_reason or None,
}
payload.update(extra)
_perf_log.info(
"[file_operation_contract] %s", json.dumps(payload, ensure_ascii=False)
)