diff --git a/README.md b/README.md index 0671664..219522b 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,7 @@ See [Roadmap](ROADMAP.md) for the full plan. Highlights: - [πŸ“œ Behavioral Contracts](docs/BEHAVIORAL_CONTRACTS.md) - Contract Γ— chaos matrix - [πŸ”„ Replay Regression](docs/REPLAY_REGRESSION.md) - Import and replay production failures - [πŸ›‘οΈ Context Attacks](docs/CONTEXT_ATTACKS.md) - Indirect injection, memory poisoning -- [πŸ“ Spec & audit](docs/V2_SPEC.md) - Spec clarifications; [implementation audit](docs/V2_AUDIT.md) - PRD/addendum verification +- [πŸ“ V2 Spec](docs/V2_SPEC.md) - Score formula, reset, Python tools ### For Developers - [πŸ—οΈ Architecture & Modules](docs/MODULES.md) - How the code works diff --git a/docs/BEHAVIORAL_CONTRACTS.md b/docs/BEHAVIORAL_CONTRACTS.md index b0c42b3..a480049 100644 --- a/docs/BEHAVIORAL_CONTRACTS.md +++ b/docs/BEHAVIORAL_CONTRACTS.md @@ -82,6 +82,8 @@ Each entry is a **scenario**: a name plus optional `tool_faults`, `llm_faults`, - **Weights:** critical = 3, high = 2, medium = 1, low = 1. - **Automatic FAIL:** If any invariant with severity `critical` fails in any scenario, the contract is considered failed regardless of the numeric score. +See [V2 Spec](V2_SPEC.md) for the exact formula and matrix isolation (reset) behavior. + --- ## Commands diff --git a/docs/CONFIGURATION_GUIDE.md b/docs/CONFIGURATION_GUIDE.md index 8aec6c9..967949a 100644 --- a/docs/CONFIGURATION_GUIDE.md +++ b/docs/CONFIGURATION_GUIDE.md @@ -45,9 +45,10 @@ With `version: "2.0"` you can add the three **chaos engineering pillars** and a | Block | Purpose | Documentation | |-------|---------|---------------| -| `chaos` | **Environment chaos** β€” Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks). | [Environment Chaos](ENVIRONMENT_CHAOS.md) | +| `chaos` | **Environment chaos** β€” Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks, **response_drift**). | [Environment Chaos](ENVIRONMENT_CHAOS.md) | | `contract` + `chaos_matrix` | **Behavioral contracts** β€” Named invariants verified across a matrix of chaos scenarios; produces a resilience score. | [Behavioral Contracts](BEHAVIORAL_CONTRACTS.md) | | `replays.sessions` | **Replay regression** β€” Import production failure sessions and replay them as deterministic tests. | [Replay Regression](REPLAY_REGRESSION.md) | +| `replays.sources` | **LangSmith sources** β€” Import from a LangSmith project or by run ID; `auto_import` re-fetches on each run/ci. | [Replay Regression](REPLAY_REGRESSION.md) | | `scoring` | **Unified score** β€” Weights for mutation_robustness, chaos_resilience, contract_compliance, replay_regression (used by `flakestorm ci`). | See [README](../README.md) β€œScores at a glance” | **Context attacks** (chaos on tool/context, not the user prompt) are configured under `chaos.context_attacks`. See [Context Attacks](CONTEXT_ATTACKS.md). diff --git a/docs/ENVIRONMENT_CHAOS.md b/docs/ENVIRONMENT_CHAOS.md index 3574f06..6b604a1 100644 --- a/docs/ENVIRONMENT_CHAOS.md +++ b/docs/ENVIRONMENT_CHAOS.md @@ -110,4 +110,10 @@ chaos: - `high_latency` β€” Delayed responses. - `indirect_injection` β€” Context attack profile (inject into tool/context). -Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. +Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. The **`model_version_drift`** profile exercises the LLM fault type **`response_drift`**. + +--- + +## See also + +- [Context Attacks](CONTEXT_ATTACKS.md) β€” Indirect injection, memory poisoning. diff --git a/docs/REPLAY_REGRESSION.md b/docs/REPLAY_REGRESSION.md index d9993de..bb71fac 100644 --- a/docs/REPLAY_REGRESSION.md +++ b/docs/REPLAY_REGRESSION.md @@ -63,7 +63,7 @@ Flakestorm resolves name first, then path; if not found, replay may fail or fall ## Configuration in flakestorm.yaml -You can define replay sessions inline or by file: +You can define replay sessions inline, by file, or via **LangSmith sources**: ```yaml version: "2.0" @@ -76,9 +76,20 @@ replays: input: "What is the capital of France?" contract: "Research Agent Contract" tool_responses: [] + # LangSmith sources (import by project or run ID; auto_import re-fetches on each run/ci) + sources: + - type: langsmith + project: "my-production-agent" + filter: + status: error # error | warning | all + date_range: last_7_days + min_latency_ms: 5000 + auto_import: true + - type: langsmith_run + run_id: "abc123def456" ``` -When you use `file:`, the session’s `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. +When you use `file:`, the session’s `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. **`replays.sources`** sessions are merged when running `flakestorm ci` or when `auto_import` is true (project sources). --- @@ -89,9 +100,10 @@ When you use `file:`, the session’s `id`, `input`, and `contract` come from th | `flakestorm replay run path/to/replay.yaml -c flakestorm.yaml` | Run a single replay file. `-c` supplies agent and contract config. | | `flakestorm replay run path/to/dir -c flakestorm.yaml` | Run all replay files in the directory. | | `flakestorm replay export --from-report REPORT.json --output ./replays` | Export failed mutations from a Flakestorm report as replay YAML files. | -| `flakestorm replay import --from-langsmith RUN_ID` | Import a session from LangSmith (requires `flakestorm[langsmith]`). | -| `flakestorm replay import --from-langsmith RUN_ID --run` | Import and run the replay. | -| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all sessions in `replays.sessions`**; reports **replay_regression** (passed/total) and **overall** weighted score. | +| `flakestorm replay run --from-langsmith RUN_ID -c flakestorm.yaml` | Import a single session from LangSmith by run ID (requires `flakestorm[langsmith]`). | +| `flakestorm replay run --from-langsmith RUN_ID --run -o replay.yaml` | Import, optionally write to file, and run the replay. | +| `flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` | Import all runs from a LangSmith project; write one YAML per run. Add `--run` to run after import. | +| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all replay sessions** (including `replays.sources` with `auto_import`); reports **replay_regression** and **overall** weighted score. | --- @@ -99,7 +111,8 @@ When you use `file:`, the session’s `id`, `input`, and `contract` come from th - **Manual** β€” Write YAML/JSON replay files from incident reports. - **Flakestorm export** β€” `flakestorm replay export --from-report REPORT.json` turns failed runs into replay files. -- **LangSmith** β€” `flakestorm replay import --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`). +- **LangSmith (single run)** β€” `flakestorm replay run --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`). +- **LangSmith (project)** β€” `flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` imports failed runs from a project; or use `replays.sources` in config with `auto_import: true` so CI re-fetches from the project each run. --- diff --git a/docs/V2_AUDIT.md b/docs/V2_AUDIT.md index 05fe932..aa117b4 100644 --- a/docs/V2_AUDIT.md +++ b/docs/V2_AUDIT.md @@ -68,16 +68,62 @@ Verification of the codebase against the PRD and addendum: behavior, config sche --- -## 6. Addendum β€” Context Attacks, Model Drift, LangSmith, Spec +## 6. Addendum (flakestorm-v2-addendum.md) β€” Full Checklist -| Item | Status | -|------|--------| -| Context attacks module (indirect_injection, etc.) | βœ… `chaos/context_attacks.py`; profile `indirect_injection.yaml` | -| response_drift in llm_proxy | βœ… `chaos/llm_proxy.py` (json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift) | -| LangSmith load + schema check | βœ… `replay/loader.py`: `load_langsmith_run`, `_validate_langsmith_run_schema` | -| Python tool fault: fail loudly when no tools | βœ… `create_instrumented_adapter` raises if type=python and tool_faults | -| Contract matrix isolation (reset) | βœ… Optional reset; warning if stateful and no reset | -| Resilience score formula (addendum Β§6.3) | βœ… In `contracts/matrix.py` and `docs/V2_SPEC.md` | +### Addition 1 β€” Context Attacks Module + +| Requirement | Status | Notes | +|-------------|--------|------| +| `chaos/context_attacks.py` | βœ… | `ContextAttackEngine`, `maybe_inject_indirect()` | +| indirect_injection (inject payloads into tool response) | βœ… | Wired via engine; profile `indirect_injection.yaml` | +| memory_poisoning, system_prompt_leak_probe | ⚠️ | Docstring/config types exist; memory_poisoning inject step and leak probe as contract assertion are not fully wired in execution flow | +| Contract invariants: excludes_pattern, behavior_unchanged | βœ… | `assertions/verifier.py`; use for system_prompt_not_leaked, injection_not_executed | +| Config: `chaos.context_attacks` list with type (e.g. indirect_injection) | βœ… | `ContextAttackConfig` in `core/config.py` | + +### Addition 2 β€” Model Version Drift (response_drift) + +| Requirement | Status | Notes | +|-------------|--------|------| +| `response_drift` in llm_faults | βœ… | `chaos/llm_proxy.py`: `apply_llm_response_drift`, drift_type, severity, direction, factor | +| drift_type: json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift | βœ… | Implemented in llm_proxy | +| Profile `model_version_drift.yaml` | βœ… | `chaos/profiles/model_version_drift.yaml` | + +### Addition 3 β€” Multi-Agent Failure Propagation + +| Requirement | Status | Notes | +|-------------|--------|------| +| v3 roadmap placeholder, no v2 implementation | βœ… | Documented in ROADMAP.md as V3; no code required | + +### Addition 4 β€” Resilience Certificate Export + +| Requirement | Status | Notes | +|-------------|--------|------| +| `flakestorm certificate` CLI command | ❌ | Not implemented | +| `reports/certificate.py` (PDF/HTML certificate) | ❌ | Not implemented | +| Config `certificate.tester_name`, pass_threshold, output_format | ❌ | Not implemented | + +### Addition 5 β€” LangSmith Replay Import + +| Requirement | Status | Notes | +|-------------|--------|------| +| Import single run by ID: `flakestorm replay --from-langsmith RUN_ID` | βœ… | `replay/loader.py`: `load_langsmith_run(run_id)`; CLI option | +| Import and run: `--from-langsmith RUN_ID --run` | βœ… | `_replay_async` supports run_after_import | +| Schema validation (fail clearly if LangSmith API changed) | βœ… | `_validate_langsmith_run_schema` | +| Map run inputs/outputs/child_runs to ReplaySessionConfig | βœ… | `_langsmith_run_to_session` | +| `--from-langsmith-project PROJECT` + `--filter-status` + `--output` | βœ… | `replay run --from-langsmith-project X --filter-status error -o ./replays/`; writes YAML per run | +| `replays.sources` (type: langsmith | langsmith_run, project, filter, auto_import) | βœ… | `LangSmithProjectSourceConfig`, `LangSmithRunSourceConfig`, `ReplayConfig.sources`; CI uses `resolve_sessions_from_config(..., include_sources=True)` | + +### Addition 6 β€” Implicit Spec Clarifications + +| Requirement | Status | Notes | +|-------------|--------|------| +| 6.1 Python callables: fail loudly if tool_faults but no tools/ToolRegistry | βœ… | `create_instrumented_adapter` raises with clear message for type=python | +| 6.2 Contract matrix: reset between cells (reset_endpoint / reset_function) | βœ… | `ContractEngine._reset_agent()`; config fields on AgentConfig | +| 6.3 Resilience score formula in spec (weighted, auto-FAIL on critical) | βœ… | `contracts/matrix.py` docstring and implementation; `docs/V2_SPEC.md` | + +--- + +**Summary:** Addendum Additions 1, 2, 3, 5, 6 are implemented (with minor gaps on full memory_poisoning/leak_probe wiring). **Addition 4 (Resilience Certificate)** is not implemented. --- diff --git a/src/flakestorm/cli/main.py b/src/flakestorm/cli/main.py index 84fb062..897cc8b 100644 --- a/src/flakestorm/cli/main.py +++ b/src/flakestorm/cli/main.py @@ -552,10 +552,31 @@ def replay_run( help="Path to configuration file", ), from_langsmith: str | None = typer.Option(None, "--from-langsmith", help="LangSmith run ID"), - run_after_import: bool = typer.Option(False, "--run", help="Run replay after import"), + from_langsmith_project: str | None = typer.Option( + None, + "--from-langsmith-project", + help="Import runs from a LangSmith project (filter by status, then write to --output)", + ), + filter_status: str = typer.Option( + "error", + "--filter-status", + help="When using --from-langsmith-project: error | warning | all", + ), + output: Path = typer.Option( + None, + "--output", + "-o", + help="When importing: output file (single run) or directory (project); replays written as YAML", + ), + run_after_import: bool = typer.Option(False, "--run", help="Run replay(s) after import"), ) -> None: """Run or import replay sessions.""" - asyncio.run(_replay_async(path, config, from_langsmith, run_after_import)) + asyncio.run( + _replay_async( + path, config, from_langsmith, from_langsmith_project, + filter_status, output, run_after_import, + ) + ) @replay_app.command("export") @@ -602,8 +623,12 @@ async def _replay_async( path: Path | None, config: Path, from_langsmith: str | None, + from_langsmith_project: str | None, + filter_status: str, + output: Path | None, run_after_import: bool, ) -> None: + import yaml from flakestorm.core.config import load_config from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter from flakestorm.replay.loader import ReplayLoader, resolve_contract @@ -612,10 +637,60 @@ async def _replay_async( agent = create_agent_adapter(cfg.agent) if cfg.chaos: agent = create_instrumented_adapter(agent, cfg.chaos) + loader = ReplayLoader() + + if from_langsmith_project: + sessions = loader.load_langsmith_project( + project_name=from_langsmith_project, + filter_status=filter_status, + ) + console.print(f"[green]Imported {len(sessions)} replay(s) from LangSmith project.[/green]") + out_path = Path(output) if output else Path("./replays") + out_path.mkdir(parents=True, exist_ok=True) + for i, session in enumerate(sessions): + safe_id = (session.id or str(i)).replace("/", "_").replace("\\", "_")[:64] + fpath = out_path / f"replay-{safe_id}.yaml" + fpath.write_text( + yaml.dump( + session.model_dump(mode="json", exclude_none=True), + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ), + encoding="utf-8", + ) + console.print(f" [dim]Wrote[/dim] {fpath}") + if run_after_import and sessions: + contract = None + try: + contract = resolve_contract(sessions[0].contract, cfg, config.parent) + except FileNotFoundError: + pass + runner = ReplayRunner(agent, contract=contract) + passed = 0 + for session in sessions: + result = await runner.run(session, contract=contract) + if result.passed: + passed += 1 + console.print(f"[bold]Replay results:[/bold] {passed}/{len(sessions)} passed") + raise typer.Exit(0) + if from_langsmith: - loader = ReplayLoader() session = loader.load_langsmith_run(from_langsmith) console.print(f"[green]Imported replay:[/green] {session.id}") + if output: + out_path = Path(output) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text( + yaml.dump( + session.model_dump(mode="json", exclude_none=True), + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ), + encoding="utf-8", + ) + console.print(f"[dim]Wrote[/dim] {out_path}") if run_after_import: contract = None try: @@ -627,8 +702,8 @@ async def _replay_async( console.print(f"[bold]Replay result:[/bold] passed={replay_result.passed}") console.print(f"[dim]Response:[/dim] {(replay_result.response.output or '')[:200]}...") raise typer.Exit(0) + if path and path.exists(): - loader = ReplayLoader() session = loader.load_file(path) contract = None try: @@ -641,7 +716,9 @@ async def _replay_async( if replay_result.verification_details: console.print(f"[dim]Checks:[/dim] {', '.join(replay_result.verification_details)}") else: - console.print("[yellow]Provide a replay file path or --from-langsmith RUN_ID.[/yellow]") + console.print( + "[yellow]Provide a replay file path, --from-langsmith RUN_ID, or --from-langsmith-project PROJECT.[/yellow]" + ) @app.command() @@ -703,42 +780,38 @@ async def _ci_async(config: Path, min_score: float) -> None: if chaos_score < min_score: exit_code = 1 - # Replay sessions + # Replay sessions (from replays.sessions and replays.sources with auto_import) replay_score = 1.0 - if cfg.replays and cfg.replays.sessions: + if cfg.replays and (cfg.replays.sessions or cfg.replays.sources): from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter - from flakestorm.replay.loader import ReplayLoader, resolve_contract + from flakestorm.replay.loader import resolve_contract, resolve_sessions_from_config from flakestorm.replay.runner import ReplayRunner agent = create_agent_adapter(cfg.agent) if cfg.chaos: agent = create_instrumented_adapter(agent, cfg.chaos) - loader = ReplayLoader() - passed = 0 - total = 0 config_path = Path(config) - for s in cfg.replays.sessions: - if getattr(s, "file", None): - fpath = Path(s.file) - if not fpath.is_absolute(): - fpath = config_path.parent / fpath - session = loader.load_file(fpath) - else: - session = s - contract = None - try: - contract = resolve_contract(session.contract, cfg, config_path.parent) - except FileNotFoundError: - pass - runner = ReplayRunner(agent, contract=contract) - result = await runner.run(session, contract=contract) - total += 1 - if result.passed: - passed += 1 - replay_score = passed / total if total else 1.0 - scores["replay_regression"] = replay_score - console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})") - if replay_score < min_score: - exit_code = 1 + sessions = resolve_sessions_from_config( + cfg.replays, config_path.parent, include_sources=True + ) + if sessions: + passed = 0 + total = 0 + for session in sessions: + contract = None + try: + contract = resolve_contract(session.contract, cfg, config_path.parent) + except FileNotFoundError: + pass + runner = ReplayRunner(agent, contract=contract) + result = await runner.run(session, contract=contract) + total += 1 + if result.passed: + passed += 1 + replay_score = passed / total if total else 1.0 + scores["replay_regression"] = replay_score + console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})") + if replay_score < min_score: + exit_code = 1 # Overall weighted score (only for components that ran) from flakestorm.core.config import ScoringConfig diff --git a/src/flakestorm/core/config.py b/src/flakestorm/core/config.py index 60157d8..a86e50a 100644 --- a/src/flakestorm/core/config.py +++ b/src/flakestorm/core/config.py @@ -11,6 +11,7 @@ import os import re from enum import Enum from pathlib import Path +from typing import Annotated, Literal, Union import yaml from pydantic import BaseModel, Field, field_validator, model_validator @@ -534,10 +535,59 @@ class ReplaySessionConfig(BaseModel): return self +class LangSmithProjectFilterConfig(BaseModel): + """Filter for LangSmith project run listing (replays.sources).""" + + status: str = Field( + default="error", + description="Filter by run status: error | warning | all", + ) + date_range: str | None = Field( + default=None, + description="e.g. last_7_days (used as start_time relative to now)", + ) + min_latency_ms: int | None = Field( + default=None, + description="Include runs with latency >= this many ms", + ) + + +class LangSmithProjectSourceConfig(BaseModel): + """Replay source: import runs from a LangSmith project (replays.sources).""" + + type: Literal["langsmith"] = "langsmith" + project: str = Field(..., description="LangSmith project name") + filter: LangSmithProjectFilterConfig | None = Field( + default=None, + description="Optional filter (status, date_range, min_latency_ms)", + ) + auto_import: bool = Field( + default=False, + description="If true, (re-)fetch runs from project on each run/ci", + ) + + +class LangSmithRunSourceConfig(BaseModel): + """Replay source: single LangSmith run by ID (replays.sources).""" + + type: Literal["langsmith_run"] = "langsmith_run" + run_id: str = Field(..., description="LangSmith run ID") + + +ReplaySourceConfig = Annotated[ + Union[LangSmithProjectSourceConfig, LangSmithRunSourceConfig], + Field(discriminator="type"), +] + + class ReplayConfig(BaseModel): """V2 replay regression configuration.""" sessions: list[ReplaySessionConfig] = Field(default_factory=list) + sources: list[ReplaySourceConfig] = Field( + default_factory=list, + description="Optional LangSmith sources (project or run_id); sessions from sources can be merged when auto_import is true", + ) class FlakeStormConfig(BaseModel): diff --git a/src/flakestorm/replay/loader.py b/src/flakestorm/replay/loader.py index e1c293f..5b54f7f 100644 --- a/src/flakestorm/replay/loader.py +++ b/src/flakestorm/replay/loader.py @@ -2,17 +2,26 @@ Replay loader: load replay sessions from YAML/JSON or LangSmith. Contract reference resolution: by name (main config) then by file path. +LangSmith: single run by ID or project listing with filters (Addition 5). """ from __future__ import annotations import json +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any import yaml -from flakestorm.core.config import ContractConfig, ReplaySessionConfig +from flakestorm.core.config import ( + ContractConfig, + LangSmithProjectFilterConfig, + LangSmithProjectSourceConfig, + LangSmithRunSourceConfig, + ReplayConfig, + ReplaySessionConfig, +) if TYPE_CHECKING: from flakestorm.core.config import FlakeStormConfig @@ -58,23 +67,82 @@ class ReplayLoader: data = yaml.safe_load(text) return ReplaySessionConfig.model_validate(data) + def _get_langsmith_client(self) -> Any: + """Return LangSmith Client; raise ImportError if langsmith not installed.""" + try: + from langsmith import Client + except ImportError as e: + raise ImportError( + "LangSmith requires: pip install flakestorm[langsmith] or pip install langsmith" + ) from e + return Client() + def load_langsmith_run(self, run_id: str) -> ReplaySessionConfig: """ Load a LangSmith run as a replay session. Requires langsmith>=0.1.0. Target API: /api/v1/runs/{run_id} Fails clearly if LangSmith schema has changed (expected fields missing). """ - try: - from langsmith import Client - except ImportError as e: - raise ImportError( - "LangSmith import requires: pip install flakestorm[langsmith] or pip install langsmith" - ) from e - client = Client() + client = self._get_langsmith_client() run = client.read_run(run_id) self._validate_langsmith_run_schema(run) return self._langsmith_run_to_session(run) + def load_langsmith_project( + self, + project_name: str, + filter_status: str = "error", + date_range: str | None = None, + min_latency_ms: int | None = None, + limit: int = 200, + ) -> list[ReplaySessionConfig]: + """ + Load runs from a LangSmith project as replay sessions. Requires langsmith>=0.1.0. + Uses list_runs(project_name=..., error=..., start_time=..., filter=..., limit=...). + Each run is fetched fully (read_run) to get child_runs for tool_responses. + """ + client = self._get_langsmith_client() + # Build list_runs kwargs + error_filter: bool | None = None + if filter_status == "error": + error_filter = True + elif filter_status == "all": + error_filter = None + else: + # "warning" or unknown: treat as non-error runs + error_filter = False + start_time: datetime | None = None + if date_range: + date_range_lower = date_range.strip().lower().replace("-", "_") + if "7" in date_range_lower and "day" in date_range_lower: + start_time = datetime.now(timezone.utc) - timedelta(days=7) + elif "24" in date_range_lower and ("hour" in date_range_lower or "day" in date_range_lower): + start_time = datetime.now(timezone.utc) - timedelta(hours=24) + elif "30" in date_range_lower and "day" in date_range_lower: + start_time = datetime.now(timezone.utc) - timedelta(days=30) + filter_str: str | None = None + if min_latency_ms is not None and min_latency_ms > 0: + # LangSmith filter uses seconds for latency + latency_sec = min_latency_ms / 1000.0 + filter_str = f"gt(latency, {latency_sec})" + runs_iterator = client.list_runs( + project_name=project_name, + error=error_filter, + start_time=start_time, + filter=filter_str, + limit=limit, + is_root=True, + ) + sessions: list[ReplaySessionConfig] = [] + for run in runs_iterator: + run_id = str(getattr(run, "id", "")) + if not run_id: + continue + full_run = client.read_run(run_id) + self._validate_langsmith_run_schema(full_run) + sessions.append(self._langsmith_run_to_session(full_run)) + return sessions + def _validate_langsmith_run_schema(self, run: Any) -> None: """Check that run has expected schema; fail clearly if LangSmith API changed.""" required = ("id", "inputs", "outputs") @@ -112,3 +180,47 @@ class ReplayLoader: tool_responses=tool_responses, contract="default", ) + + +def resolve_sessions_from_config( + replays: ReplayConfig | None, + config_dir: Path | None = None, + *, + include_sources: bool = True, +) -> list[ReplaySessionConfig]: + """ + Build full list of replay sessions from config: inline sessions, file-backed + sessions (loaded from disk), and optionally sessions from replays.sources + (LangSmith run_id or project with auto_import). + """ + if not replays: + return [] + loader = ReplayLoader() + out: list[ReplaySessionConfig] = [] + for s in replays.sessions: + if s.file: + path = Path(s.file) + if not path.is_absolute() and config_dir: + path = config_dir / path + out.append(loader.load_file(path)) + else: + out.append(s) + if not include_sources or not replays.sources: + return out + for src in replays.sources: + if isinstance(src, LangSmithRunSourceConfig): + out.append(loader.load_langsmith_run(src.run_id)) + elif isinstance(src, LangSmithProjectSourceConfig) and src.auto_import: + filt = src.filter + filter_status = filt.status if filt else "error" + date_range = filt.date_range if filt else None + min_latency_ms = filt.min_latency_ms if filt else None + out.extend( + loader.load_langsmith_project( + project_name=src.project, + filter_status=filter_status, + date_range=date_range, + min_latency_ms=min_latency_ms, + ) + ) + return out diff --git a/tests/test_replay_integration.py b/tests/test_replay_integration.py index b4b7b5a..f84d9f4 100644 --- a/tests/test_replay_integration.py +++ b/tests/test_replay_integration.py @@ -20,10 +20,11 @@ from flakestorm.core.config import ( AdvancedConfig, ContractConfig, ContractInvariantConfig, + ReplayConfig, ReplaySessionConfig, ReplayToolResponseConfig, ) -from flakestorm.replay.loader import ReplayLoader, resolve_contract +from flakestorm.replay.loader import ReplayLoader, resolve_contract, resolve_sessions_from_config from flakestorm.replay.runner import ReplayRunner, ReplayResult from flakestorm.core.protocol import AgentResponse, BaseAgentAdapter @@ -99,6 +100,60 @@ class TestReplayLoader: with pytest.raises(FileNotFoundError): resolve_contract("nonexistent", config, None) + def test_resolve_sessions_from_config_inline_only(self): + """resolve_sessions_from_config returns inline sessions when no sources.""" + replays = ReplayConfig( + sessions=[ + ReplaySessionConfig(id="a", input="q1", contract="default"), + ReplaySessionConfig(id="b", input="q2", contract="default"), + ], + sources=[], + ) + out = resolve_sessions_from_config(replays, None, include_sources=True) + assert len(out) == 2 + assert out[0].id == "a" + assert out[1].id == "b" + + def test_resolve_sessions_from_config_file_backed(self): + """resolve_sessions_from_config loads file-backed sessions from config_dir.""" + with tempfile.NamedTemporaryFile( + suffix=".yaml", delete=False, mode="w", encoding="utf-8" + ) as f: + yaml.dump({ + "id": "file-session", + "input": "from file", + "tool_responses": [], + "contract": "default", + }, f) + f.flush() + fpath = Path(f.name) + try: + config_dir = fpath.parent + replays = ReplayConfig( + sessions=[ReplaySessionConfig(id="", input="", file=fpath.name)], + sources=[], + ) + out = resolve_sessions_from_config(replays, config_dir, include_sources=True) + assert len(out) == 1 + assert out[0].id == "file-session" + assert out[0].input == "from file" + finally: + fpath.unlink(missing_ok=True) + + def test_replay_config_sources_parsed_from_dict(self): + """ReplayConfig.sources parses langsmith and langsmith_run from dict (YAML).""" + cfg = ReplayConfig.model_validate({ + "sessions": [], + "sources": [ + {"type": "langsmith", "project": "my-agent", "auto_import": True}, + {"type": "langsmith_run", "run_id": "abc-123"}, + ], + }) + assert len(cfg.sources) == 2 + assert cfg.sources[0].project == "my-agent" + assert cfg.sources[0].auto_import is True + assert cfg.sources[1].run_id == "abc-123" + class TestReplayRunner: """Test replay runner and verification."""