Enhance documentation and replay functionality in Flakestorm. Updated README to clarify V2 Spec and added references to LangSmith sources in configuration guide. Improved replay regression capabilities by allowing imports from LangSmith projects and runs, with filtering options. Added new classes for LangSmith project and run sources in the configuration. Updated replay loader to support project imports and refined session resolution logic.

This commit is contained in:
Francisco M Humarang Jr. 2026-03-07 02:04:55 +08:00
parent 58f49b08ba
commit 1bbe3a1f7b
10 changed files with 419 additions and 61 deletions

View file

@ -237,7 +237,7 @@ See [Roadmap](ROADMAP.md) for the full plan. Highlights:
- [📜 Behavioral Contracts](docs/BEHAVIORAL_CONTRACTS.md) - Contract × chaos matrix - [📜 Behavioral Contracts](docs/BEHAVIORAL_CONTRACTS.md) - Contract × chaos matrix
- [🔄 Replay Regression](docs/REPLAY_REGRESSION.md) - Import and replay production failures - [🔄 Replay Regression](docs/REPLAY_REGRESSION.md) - Import and replay production failures
- [🛡️ Context Attacks](docs/CONTEXT_ATTACKS.md) - Indirect injection, memory poisoning - [🛡️ Context Attacks](docs/CONTEXT_ATTACKS.md) - Indirect injection, memory poisoning
- [📐 Spec & audit](docs/V2_SPEC.md) - Spec clarifications; [implementation audit](docs/V2_AUDIT.md) - PRD/addendum verification - [📐 V2 Spec](docs/V2_SPEC.md) - Score formula, reset, Python tools
### For Developers ### For Developers
- [🏗️ Architecture & Modules](docs/MODULES.md) - How the code works - [🏗️ Architecture & Modules](docs/MODULES.md) - How the code works

View file

@ -82,6 +82,8 @@ Each entry is a **scenario**: a name plus optional `tool_faults`, `llm_faults`,
- **Weights:** critical = 3, high = 2, medium = 1, low = 1. - **Weights:** critical = 3, high = 2, medium = 1, low = 1.
- **Automatic FAIL:** If any invariant with severity `critical` fails in any scenario, the contract is considered failed regardless of the numeric score. - **Automatic FAIL:** If any invariant with severity `critical` fails in any scenario, the contract is considered failed regardless of the numeric score.
See [V2 Spec](V2_SPEC.md) for the exact formula and matrix isolation (reset) behavior.
--- ---
## Commands ## Commands

View file

@ -45,9 +45,10 @@ With `version: "2.0"` you can add the three **chaos engineering pillars** and a
| Block | Purpose | Documentation | | Block | Purpose | Documentation |
|-------|---------|---------------| |-------|---------|---------------|
| `chaos` | **Environment chaos** — Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks). | [Environment Chaos](ENVIRONMENT_CHAOS.md) | | `chaos` | **Environment chaos** — Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks, **response_drift**). | [Environment Chaos](ENVIRONMENT_CHAOS.md) |
| `contract` + `chaos_matrix` | **Behavioral contracts** — Named invariants verified across a matrix of chaos scenarios; produces a resilience score. | [Behavioral Contracts](BEHAVIORAL_CONTRACTS.md) | | `contract` + `chaos_matrix` | **Behavioral contracts** — Named invariants verified across a matrix of chaos scenarios; produces a resilience score. | [Behavioral Contracts](BEHAVIORAL_CONTRACTS.md) |
| `replays.sessions` | **Replay regression** — Import production failure sessions and replay them as deterministic tests. | [Replay Regression](REPLAY_REGRESSION.md) | | `replays.sessions` | **Replay regression** — Import production failure sessions and replay them as deterministic tests. | [Replay Regression](REPLAY_REGRESSION.md) |
| `replays.sources` | **LangSmith sources** — Import from a LangSmith project or by run ID; `auto_import` re-fetches on each run/ci. | [Replay Regression](REPLAY_REGRESSION.md) |
| `scoring` | **Unified score** — Weights for mutation_robustness, chaos_resilience, contract_compliance, replay_regression (used by `flakestorm ci`). | See [README](../README.md) “Scores at a glance” | | `scoring` | **Unified score** — Weights for mutation_robustness, chaos_resilience, contract_compliance, replay_regression (used by `flakestorm ci`). | See [README](../README.md) “Scores at a glance” |
**Context attacks** (chaos on tool/context, not the user prompt) are configured under `chaos.context_attacks`. See [Context Attacks](CONTEXT_ATTACKS.md). **Context attacks** (chaos on tool/context, not the user prompt) are configured under `chaos.context_attacks`. See [Context Attacks](CONTEXT_ATTACKS.md).

View file

@ -110,4 +110,10 @@ chaos:
- `high_latency` — Delayed responses. - `high_latency` — Delayed responses.
- `indirect_injection` — Context attack profile (inject into tool/context). - `indirect_injection` — Context attack profile (inject into tool/context).
Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. The **`model_version_drift`** profile exercises the LLM fault type **`response_drift`**.
---
## See also
- [Context Attacks](CONTEXT_ATTACKS.md) — Indirect injection, memory poisoning.

View file

@ -63,7 +63,7 @@ Flakestorm resolves name first, then path; if not found, replay may fail or fall
## Configuration in flakestorm.yaml ## Configuration in flakestorm.yaml
You can define replay sessions inline or by file: You can define replay sessions inline, by file, or via **LangSmith sources**:
```yaml ```yaml
version: "2.0" version: "2.0"
@ -76,9 +76,20 @@ replays:
input: "What is the capital of France?" input: "What is the capital of France?"
contract: "Research Agent Contract" contract: "Research Agent Contract"
tool_responses: [] tool_responses: []
# LangSmith sources (import by project or run ID; auto_import re-fetches on each run/ci)
sources:
- type: langsmith
project: "my-production-agent"
filter:
status: error # error | warning | all
date_range: last_7_days
min_latency_ms: 5000
auto_import: true
- type: langsmith_run
run_id: "abc123def456"
``` ```
When you use `file:`, the sessions `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. When you use `file:`, the sessions `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. **`replays.sources`** sessions are merged when running `flakestorm ci` or when `auto_import` is true (project sources).
--- ---
@ -89,9 +100,10 @@ When you use `file:`, the sessions `id`, `input`, and `contract` come from th
| `flakestorm replay run path/to/replay.yaml -c flakestorm.yaml` | Run a single replay file. `-c` supplies agent and contract config. | | `flakestorm replay run path/to/replay.yaml -c flakestorm.yaml` | Run a single replay file. `-c` supplies agent and contract config. |
| `flakestorm replay run path/to/dir -c flakestorm.yaml` | Run all replay files in the directory. | | `flakestorm replay run path/to/dir -c flakestorm.yaml` | Run all replay files in the directory. |
| `flakestorm replay export --from-report REPORT.json --output ./replays` | Export failed mutations from a Flakestorm report as replay YAML files. | | `flakestorm replay export --from-report REPORT.json --output ./replays` | Export failed mutations from a Flakestorm report as replay YAML files. |
| `flakestorm replay import --from-langsmith RUN_ID` | Import a session from LangSmith (requires `flakestorm[langsmith]`). | | `flakestorm replay run --from-langsmith RUN_ID -c flakestorm.yaml` | Import a single session from LangSmith by run ID (requires `flakestorm[langsmith]`). |
| `flakestorm replay import --from-langsmith RUN_ID --run` | Import and run the replay. | | `flakestorm replay run --from-langsmith RUN_ID --run -o replay.yaml` | Import, optionally write to file, and run the replay. |
| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all sessions in `replays.sessions`**; reports **replay_regression** (passed/total) and **overall** weighted score. | | `flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` | Import all runs from a LangSmith project; write one YAML per run. Add `--run` to run after import. |
| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all replay sessions** (including `replays.sources` with `auto_import`); reports **replay_regression** and **overall** weighted score. |
--- ---
@ -99,7 +111,8 @@ When you use `file:`, the sessions `id`, `input`, and `contract` come from th
- **Manual** — Write YAML/JSON replay files from incident reports. - **Manual** — Write YAML/JSON replay files from incident reports.
- **Flakestorm export**`flakestorm replay export --from-report REPORT.json` turns failed runs into replay files. - **Flakestorm export**`flakestorm replay export --from-report REPORT.json` turns failed runs into replay files.
- **LangSmith**`flakestorm replay import --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`). - **LangSmith (single run)**`flakestorm replay run --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`).
- **LangSmith (project)**`flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` imports failed runs from a project; or use `replays.sources` in config with `auto_import: true` so CI re-fetches from the project each run.
--- ---

View file

@ -68,16 +68,62 @@ Verification of the codebase against the PRD and addendum: behavior, config sche
--- ---
## 6. Addendum — Context Attacks, Model Drift, LangSmith, Spec ## 6. Addendum (flakestorm-v2-addendum.md) — Full Checklist
| Item | Status | ### Addition 1 — Context Attacks Module
|------|--------|
| Context attacks module (indirect_injection, etc.) | ✅ `chaos/context_attacks.py`; profile `indirect_injection.yaml` | | Requirement | Status | Notes |
| response_drift in llm_proxy | ✅ `chaos/llm_proxy.py` (json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift) | |-------------|--------|------|
| LangSmith load + schema check | ✅ `replay/loader.py`: `load_langsmith_run`, `_validate_langsmith_run_schema` | | `chaos/context_attacks.py` | ✅ | `ContextAttackEngine`, `maybe_inject_indirect()` |
| Python tool fault: fail loudly when no tools | ✅ `create_instrumented_adapter` raises if type=python and tool_faults | | indirect_injection (inject payloads into tool response) | ✅ | Wired via engine; profile `indirect_injection.yaml` |
| Contract matrix isolation (reset) | ✅ Optional reset; warning if stateful and no reset | | memory_poisoning, system_prompt_leak_probe | ⚠️ | Docstring/config types exist; memory_poisoning inject step and leak probe as contract assertion are not fully wired in execution flow |
| Resilience score formula (addendum §6.3) | ✅ In `contracts/matrix.py` and `docs/V2_SPEC.md` | | Contract invariants: excludes_pattern, behavior_unchanged | ✅ | `assertions/verifier.py`; use for system_prompt_not_leaked, injection_not_executed |
| Config: `chaos.context_attacks` list with type (e.g. indirect_injection) | ✅ | `ContextAttackConfig` in `core/config.py` |
### Addition 2 — Model Version Drift (response_drift)
| Requirement | Status | Notes |
|-------------|--------|------|
| `response_drift` in llm_faults | ✅ | `chaos/llm_proxy.py`: `apply_llm_response_drift`, drift_type, severity, direction, factor |
| drift_type: json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift | ✅ | Implemented in llm_proxy |
| Profile `model_version_drift.yaml` | ✅ | `chaos/profiles/model_version_drift.yaml` |
### Addition 3 — Multi-Agent Failure Propagation
| Requirement | Status | Notes |
|-------------|--------|------|
| v3 roadmap placeholder, no v2 implementation | ✅ | Documented in ROADMAP.md as V3; no code required |
### Addition 4 — Resilience Certificate Export
| Requirement | Status | Notes |
|-------------|--------|------|
| `flakestorm certificate` CLI command | ❌ | Not implemented |
| `reports/certificate.py` (PDF/HTML certificate) | ❌ | Not implemented |
| Config `certificate.tester_name`, pass_threshold, output_format | ❌ | Not implemented |
### Addition 5 — LangSmith Replay Import
| Requirement | Status | Notes |
|-------------|--------|------|
| Import single run by ID: `flakestorm replay --from-langsmith RUN_ID` | ✅ | `replay/loader.py`: `load_langsmith_run(run_id)`; CLI option |
| Import and run: `--from-langsmith RUN_ID --run` | ✅ | `_replay_async` supports run_after_import |
| Schema validation (fail clearly if LangSmith API changed) | ✅ | `_validate_langsmith_run_schema` |
| Map run inputs/outputs/child_runs to ReplaySessionConfig | ✅ | `_langsmith_run_to_session` |
| `--from-langsmith-project PROJECT` + `--filter-status` + `--output` | ✅ | `replay run --from-langsmith-project X --filter-status error -o ./replays/`; writes YAML per run |
| `replays.sources` (type: langsmith | langsmith_run, project, filter, auto_import) | ✅ | `LangSmithProjectSourceConfig`, `LangSmithRunSourceConfig`, `ReplayConfig.sources`; CI uses `resolve_sessions_from_config(..., include_sources=True)` |
### Addition 6 — Implicit Spec Clarifications
| Requirement | Status | Notes |
|-------------|--------|------|
| 6.1 Python callables: fail loudly if tool_faults but no tools/ToolRegistry | ✅ | `create_instrumented_adapter` raises with clear message for type=python |
| 6.2 Contract matrix: reset between cells (reset_endpoint / reset_function) | ✅ | `ContractEngine._reset_agent()`; config fields on AgentConfig |
| 6.3 Resilience score formula in spec (weighted, auto-FAIL on critical) | ✅ | `contracts/matrix.py` docstring and implementation; `docs/V2_SPEC.md` |
---
**Summary:** Addendum Additions 1, 2, 3, 5, 6 are implemented (with minor gaps on full memory_poisoning/leak_probe wiring). **Addition 4 (Resilience Certificate)** is not implemented.
--- ---

View file

@ -552,10 +552,31 @@ def replay_run(
help="Path to configuration file", help="Path to configuration file",
), ),
from_langsmith: str | None = typer.Option(None, "--from-langsmith", help="LangSmith run ID"), from_langsmith: str | None = typer.Option(None, "--from-langsmith", help="LangSmith run ID"),
run_after_import: bool = typer.Option(False, "--run", help="Run replay after import"), from_langsmith_project: str | None = typer.Option(
None,
"--from-langsmith-project",
help="Import runs from a LangSmith project (filter by status, then write to --output)",
),
filter_status: str = typer.Option(
"error",
"--filter-status",
help="When using --from-langsmith-project: error | warning | all",
),
output: Path = typer.Option(
None,
"--output",
"-o",
help="When importing: output file (single run) or directory (project); replays written as YAML",
),
run_after_import: bool = typer.Option(False, "--run", help="Run replay(s) after import"),
) -> None: ) -> None:
"""Run or import replay sessions.""" """Run or import replay sessions."""
asyncio.run(_replay_async(path, config, from_langsmith, run_after_import)) asyncio.run(
_replay_async(
path, config, from_langsmith, from_langsmith_project,
filter_status, output, run_after_import,
)
)
@replay_app.command("export") @replay_app.command("export")
@ -602,8 +623,12 @@ async def _replay_async(
path: Path | None, path: Path | None,
config: Path, config: Path,
from_langsmith: str | None, from_langsmith: str | None,
from_langsmith_project: str | None,
filter_status: str,
output: Path | None,
run_after_import: bool, run_after_import: bool,
) -> None: ) -> None:
import yaml
from flakestorm.core.config import load_config from flakestorm.core.config import load_config
from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter
from flakestorm.replay.loader import ReplayLoader, resolve_contract from flakestorm.replay.loader import ReplayLoader, resolve_contract
@ -612,10 +637,60 @@ async def _replay_async(
agent = create_agent_adapter(cfg.agent) agent = create_agent_adapter(cfg.agent)
if cfg.chaos: if cfg.chaos:
agent = create_instrumented_adapter(agent, cfg.chaos) agent = create_instrumented_adapter(agent, cfg.chaos)
loader = ReplayLoader()
if from_langsmith_project:
sessions = loader.load_langsmith_project(
project_name=from_langsmith_project,
filter_status=filter_status,
)
console.print(f"[green]Imported {len(sessions)} replay(s) from LangSmith project.[/green]")
out_path = Path(output) if output else Path("./replays")
out_path.mkdir(parents=True, exist_ok=True)
for i, session in enumerate(sessions):
safe_id = (session.id or str(i)).replace("/", "_").replace("\\", "_")[:64]
fpath = out_path / f"replay-{safe_id}.yaml"
fpath.write_text(
yaml.dump(
session.model_dump(mode="json", exclude_none=True),
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
),
encoding="utf-8",
)
console.print(f" [dim]Wrote[/dim] {fpath}")
if run_after_import and sessions:
contract = None
try:
contract = resolve_contract(sessions[0].contract, cfg, config.parent)
except FileNotFoundError:
pass
runner = ReplayRunner(agent, contract=contract)
passed = 0
for session in sessions:
result = await runner.run(session, contract=contract)
if result.passed:
passed += 1
console.print(f"[bold]Replay results:[/bold] {passed}/{len(sessions)} passed")
raise typer.Exit(0)
if from_langsmith: if from_langsmith:
loader = ReplayLoader()
session = loader.load_langsmith_run(from_langsmith) session = loader.load_langsmith_run(from_langsmith)
console.print(f"[green]Imported replay:[/green] {session.id}") console.print(f"[green]Imported replay:[/green] {session.id}")
if output:
out_path = Path(output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(
yaml.dump(
session.model_dump(mode="json", exclude_none=True),
default_flow_style=False,
sort_keys=False,
allow_unicode=True,
),
encoding="utf-8",
)
console.print(f"[dim]Wrote[/dim] {out_path}")
if run_after_import: if run_after_import:
contract = None contract = None
try: try:
@ -627,8 +702,8 @@ async def _replay_async(
console.print(f"[bold]Replay result:[/bold] passed={replay_result.passed}") console.print(f"[bold]Replay result:[/bold] passed={replay_result.passed}")
console.print(f"[dim]Response:[/dim] {(replay_result.response.output or '')[:200]}...") console.print(f"[dim]Response:[/dim] {(replay_result.response.output or '')[:200]}...")
raise typer.Exit(0) raise typer.Exit(0)
if path and path.exists(): if path and path.exists():
loader = ReplayLoader()
session = loader.load_file(path) session = loader.load_file(path)
contract = None contract = None
try: try:
@ -641,7 +716,9 @@ async def _replay_async(
if replay_result.verification_details: if replay_result.verification_details:
console.print(f"[dim]Checks:[/dim] {', '.join(replay_result.verification_details)}") console.print(f"[dim]Checks:[/dim] {', '.join(replay_result.verification_details)}")
else: else:
console.print("[yellow]Provide a replay file path or --from-langsmith RUN_ID.[/yellow]") console.print(
"[yellow]Provide a replay file path, --from-langsmith RUN_ID, or --from-langsmith-project PROJECT.[/yellow]"
)
@app.command() @app.command()
@ -703,42 +780,38 @@ async def _ci_async(config: Path, min_score: float) -> None:
if chaos_score < min_score: if chaos_score < min_score:
exit_code = 1 exit_code = 1
# Replay sessions # Replay sessions (from replays.sessions and replays.sources with auto_import)
replay_score = 1.0 replay_score = 1.0
if cfg.replays and cfg.replays.sessions: if cfg.replays and (cfg.replays.sessions or cfg.replays.sources):
from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter
from flakestorm.replay.loader import ReplayLoader, resolve_contract from flakestorm.replay.loader import resolve_contract, resolve_sessions_from_config
from flakestorm.replay.runner import ReplayRunner from flakestorm.replay.runner import ReplayRunner
agent = create_agent_adapter(cfg.agent) agent = create_agent_adapter(cfg.agent)
if cfg.chaos: if cfg.chaos:
agent = create_instrumented_adapter(agent, cfg.chaos) agent = create_instrumented_adapter(agent, cfg.chaos)
loader = ReplayLoader()
passed = 0
total = 0
config_path = Path(config) config_path = Path(config)
for s in cfg.replays.sessions: sessions = resolve_sessions_from_config(
if getattr(s, "file", None): cfg.replays, config_path.parent, include_sources=True
fpath = Path(s.file) )
if not fpath.is_absolute(): if sessions:
fpath = config_path.parent / fpath passed = 0
session = loader.load_file(fpath) total = 0
else: for session in sessions:
session = s contract = None
contract = None try:
try: contract = resolve_contract(session.contract, cfg, config_path.parent)
contract = resolve_contract(session.contract, cfg, config_path.parent) except FileNotFoundError:
except FileNotFoundError: pass
pass runner = ReplayRunner(agent, contract=contract)
runner = ReplayRunner(agent, contract=contract) result = await runner.run(session, contract=contract)
result = await runner.run(session, contract=contract) total += 1
total += 1 if result.passed:
if result.passed: passed += 1
passed += 1 replay_score = passed / total if total else 1.0
replay_score = passed / total if total else 1.0 scores["replay_regression"] = replay_score
scores["replay_regression"] = replay_score console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})")
console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})") if replay_score < min_score:
if replay_score < min_score: exit_code = 1
exit_code = 1
# Overall weighted score (only for components that ran) # Overall weighted score (only for components that ran)
from flakestorm.core.config import ScoringConfig from flakestorm.core.config import ScoringConfig

View file

@ -11,6 +11,7 @@ import os
import re import re
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Annotated, Literal, Union
import yaml import yaml
from pydantic import BaseModel, Field, field_validator, model_validator from pydantic import BaseModel, Field, field_validator, model_validator
@ -534,10 +535,59 @@ class ReplaySessionConfig(BaseModel):
return self return self
class LangSmithProjectFilterConfig(BaseModel):
"""Filter for LangSmith project run listing (replays.sources)."""
status: str = Field(
default="error",
description="Filter by run status: error | warning | all",
)
date_range: str | None = Field(
default=None,
description="e.g. last_7_days (used as start_time relative to now)",
)
min_latency_ms: int | None = Field(
default=None,
description="Include runs with latency >= this many ms",
)
class LangSmithProjectSourceConfig(BaseModel):
"""Replay source: import runs from a LangSmith project (replays.sources)."""
type: Literal["langsmith"] = "langsmith"
project: str = Field(..., description="LangSmith project name")
filter: LangSmithProjectFilterConfig | None = Field(
default=None,
description="Optional filter (status, date_range, min_latency_ms)",
)
auto_import: bool = Field(
default=False,
description="If true, (re-)fetch runs from project on each run/ci",
)
class LangSmithRunSourceConfig(BaseModel):
"""Replay source: single LangSmith run by ID (replays.sources)."""
type: Literal["langsmith_run"] = "langsmith_run"
run_id: str = Field(..., description="LangSmith run ID")
ReplaySourceConfig = Annotated[
Union[LangSmithProjectSourceConfig, LangSmithRunSourceConfig],
Field(discriminator="type"),
]
class ReplayConfig(BaseModel): class ReplayConfig(BaseModel):
"""V2 replay regression configuration.""" """V2 replay regression configuration."""
sessions: list[ReplaySessionConfig] = Field(default_factory=list) sessions: list[ReplaySessionConfig] = Field(default_factory=list)
sources: list[ReplaySourceConfig] = Field(
default_factory=list,
description="Optional LangSmith sources (project or run_id); sessions from sources can be merged when auto_import is true",
)
class FlakeStormConfig(BaseModel): class FlakeStormConfig(BaseModel):

View file

@ -2,17 +2,26 @@
Replay loader: load replay sessions from YAML/JSON or LangSmith. Replay loader: load replay sessions from YAML/JSON or LangSmith.
Contract reference resolution: by name (main config) then by file path. Contract reference resolution: by name (main config) then by file path.
LangSmith: single run by ID or project listing with filters (Addition 5).
""" """
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import yaml import yaml
from flakestorm.core.config import ContractConfig, ReplaySessionConfig from flakestorm.core.config import (
ContractConfig,
LangSmithProjectFilterConfig,
LangSmithProjectSourceConfig,
LangSmithRunSourceConfig,
ReplayConfig,
ReplaySessionConfig,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from flakestorm.core.config import FlakeStormConfig from flakestorm.core.config import FlakeStormConfig
@ -58,23 +67,82 @@ class ReplayLoader:
data = yaml.safe_load(text) data = yaml.safe_load(text)
return ReplaySessionConfig.model_validate(data) return ReplaySessionConfig.model_validate(data)
def _get_langsmith_client(self) -> Any:
"""Return LangSmith Client; raise ImportError if langsmith not installed."""
try:
from langsmith import Client
except ImportError as e:
raise ImportError(
"LangSmith requires: pip install flakestorm[langsmith] or pip install langsmith"
) from e
return Client()
def load_langsmith_run(self, run_id: str) -> ReplaySessionConfig: def load_langsmith_run(self, run_id: str) -> ReplaySessionConfig:
""" """
Load a LangSmith run as a replay session. Requires langsmith>=0.1.0. Load a LangSmith run as a replay session. Requires langsmith>=0.1.0.
Target API: /api/v1/runs/{run_id} Target API: /api/v1/runs/{run_id}
Fails clearly if LangSmith schema has changed (expected fields missing). Fails clearly if LangSmith schema has changed (expected fields missing).
""" """
try: client = self._get_langsmith_client()
from langsmith import Client
except ImportError as e:
raise ImportError(
"LangSmith import requires: pip install flakestorm[langsmith] or pip install langsmith"
) from e
client = Client()
run = client.read_run(run_id) run = client.read_run(run_id)
self._validate_langsmith_run_schema(run) self._validate_langsmith_run_schema(run)
return self._langsmith_run_to_session(run) return self._langsmith_run_to_session(run)
def load_langsmith_project(
self,
project_name: str,
filter_status: str = "error",
date_range: str | None = None,
min_latency_ms: int | None = None,
limit: int = 200,
) -> list[ReplaySessionConfig]:
"""
Load runs from a LangSmith project as replay sessions. Requires langsmith>=0.1.0.
Uses list_runs(project_name=..., error=..., start_time=..., filter=..., limit=...).
Each run is fetched fully (read_run) to get child_runs for tool_responses.
"""
client = self._get_langsmith_client()
# Build list_runs kwargs
error_filter: bool | None = None
if filter_status == "error":
error_filter = True
elif filter_status == "all":
error_filter = None
else:
# "warning" or unknown: treat as non-error runs
error_filter = False
start_time: datetime | None = None
if date_range:
date_range_lower = date_range.strip().lower().replace("-", "_")
if "7" in date_range_lower and "day" in date_range_lower:
start_time = datetime.now(timezone.utc) - timedelta(days=7)
elif "24" in date_range_lower and ("hour" in date_range_lower or "day" in date_range_lower):
start_time = datetime.now(timezone.utc) - timedelta(hours=24)
elif "30" in date_range_lower and "day" in date_range_lower:
start_time = datetime.now(timezone.utc) - timedelta(days=30)
filter_str: str | None = None
if min_latency_ms is not None and min_latency_ms > 0:
# LangSmith filter uses seconds for latency
latency_sec = min_latency_ms / 1000.0
filter_str = f"gt(latency, {latency_sec})"
runs_iterator = client.list_runs(
project_name=project_name,
error=error_filter,
start_time=start_time,
filter=filter_str,
limit=limit,
is_root=True,
)
sessions: list[ReplaySessionConfig] = []
for run in runs_iterator:
run_id = str(getattr(run, "id", ""))
if not run_id:
continue
full_run = client.read_run(run_id)
self._validate_langsmith_run_schema(full_run)
sessions.append(self._langsmith_run_to_session(full_run))
return sessions
def _validate_langsmith_run_schema(self, run: Any) -> None: def _validate_langsmith_run_schema(self, run: Any) -> None:
"""Check that run has expected schema; fail clearly if LangSmith API changed.""" """Check that run has expected schema; fail clearly if LangSmith API changed."""
required = ("id", "inputs", "outputs") required = ("id", "inputs", "outputs")
@ -112,3 +180,47 @@ class ReplayLoader:
tool_responses=tool_responses, tool_responses=tool_responses,
contract="default", contract="default",
) )
def resolve_sessions_from_config(
replays: ReplayConfig | None,
config_dir: Path | None = None,
*,
include_sources: bool = True,
) -> list[ReplaySessionConfig]:
"""
Build full list of replay sessions from config: inline sessions, file-backed
sessions (loaded from disk), and optionally sessions from replays.sources
(LangSmith run_id or project with auto_import).
"""
if not replays:
return []
loader = ReplayLoader()
out: list[ReplaySessionConfig] = []
for s in replays.sessions:
if s.file:
path = Path(s.file)
if not path.is_absolute() and config_dir:
path = config_dir / path
out.append(loader.load_file(path))
else:
out.append(s)
if not include_sources or not replays.sources:
return out
for src in replays.sources:
if isinstance(src, LangSmithRunSourceConfig):
out.append(loader.load_langsmith_run(src.run_id))
elif isinstance(src, LangSmithProjectSourceConfig) and src.auto_import:
filt = src.filter
filter_status = filt.status if filt else "error"
date_range = filt.date_range if filt else None
min_latency_ms = filt.min_latency_ms if filt else None
out.extend(
loader.load_langsmith_project(
project_name=src.project,
filter_status=filter_status,
date_range=date_range,
min_latency_ms=min_latency_ms,
)
)
return out

View file

@ -20,10 +20,11 @@ from flakestorm.core.config import (
AdvancedConfig, AdvancedConfig,
ContractConfig, ContractConfig,
ContractInvariantConfig, ContractInvariantConfig,
ReplayConfig,
ReplaySessionConfig, ReplaySessionConfig,
ReplayToolResponseConfig, ReplayToolResponseConfig,
) )
from flakestorm.replay.loader import ReplayLoader, resolve_contract from flakestorm.replay.loader import ReplayLoader, resolve_contract, resolve_sessions_from_config
from flakestorm.replay.runner import ReplayRunner, ReplayResult from flakestorm.replay.runner import ReplayRunner, ReplayResult
from flakestorm.core.protocol import AgentResponse, BaseAgentAdapter from flakestorm.core.protocol import AgentResponse, BaseAgentAdapter
@ -99,6 +100,60 @@ class TestReplayLoader:
with pytest.raises(FileNotFoundError): with pytest.raises(FileNotFoundError):
resolve_contract("nonexistent", config, None) resolve_contract("nonexistent", config, None)
def test_resolve_sessions_from_config_inline_only(self):
"""resolve_sessions_from_config returns inline sessions when no sources."""
replays = ReplayConfig(
sessions=[
ReplaySessionConfig(id="a", input="q1", contract="default"),
ReplaySessionConfig(id="b", input="q2", contract="default"),
],
sources=[],
)
out = resolve_sessions_from_config(replays, None, include_sources=True)
assert len(out) == 2
assert out[0].id == "a"
assert out[1].id == "b"
def test_resolve_sessions_from_config_file_backed(self):
"""resolve_sessions_from_config loads file-backed sessions from config_dir."""
with tempfile.NamedTemporaryFile(
suffix=".yaml", delete=False, mode="w", encoding="utf-8"
) as f:
yaml.dump({
"id": "file-session",
"input": "from file",
"tool_responses": [],
"contract": "default",
}, f)
f.flush()
fpath = Path(f.name)
try:
config_dir = fpath.parent
replays = ReplayConfig(
sessions=[ReplaySessionConfig(id="", input="", file=fpath.name)],
sources=[],
)
out = resolve_sessions_from_config(replays, config_dir, include_sources=True)
assert len(out) == 1
assert out[0].id == "file-session"
assert out[0].input == "from file"
finally:
fpath.unlink(missing_ok=True)
def test_replay_config_sources_parsed_from_dict(self):
"""ReplayConfig.sources parses langsmith and langsmith_run from dict (YAML)."""
cfg = ReplayConfig.model_validate({
"sessions": [],
"sources": [
{"type": "langsmith", "project": "my-agent", "auto_import": True},
{"type": "langsmith_run", "run_id": "abc-123"},
],
})
assert len(cfg.sources) == 2
assert cfg.sources[0].project == "my-agent"
assert cfg.sources[0].auto_import is True
assert cfg.sources[1].run_id == "abc-123"
class TestReplayRunner: class TestReplayRunner:
"""Test replay runner and verification.""" """Test replay runner and verification."""