From 1bbe3a1f7bf99f0c60ccb9c83ae34405d37793a3 Mon Sep 17 00:00:00 2001 From: "Francisco M Humarang Jr." Date: Sat, 7 Mar 2026 02:04:55 +0800 Subject: [PATCH] Enhance documentation and replay functionality in Flakestorm. Updated README to clarify V2 Spec and added references to LangSmith sources in configuration guide. Improved replay regression capabilities by allowing imports from LangSmith projects and runs, with filtering options. Added new classes for LangSmith project and run sources in the configuration. Updated replay loader to support project imports and refined session resolution logic. --- README.md | 2 +- docs/BEHAVIORAL_CONTRACTS.md | 2 + docs/CONFIGURATION_GUIDE.md | 3 +- docs/ENVIRONMENT_CHAOS.md | 8 +- docs/REPLAY_REGRESSION.md | 25 ++++-- docs/V2_AUDIT.md | 64 ++++++++++++-- src/flakestorm/cli/main.py | 141 +++++++++++++++++++++++-------- src/flakestorm/core/config.py | 50 +++++++++++ src/flakestorm/replay/loader.py | 128 ++++++++++++++++++++++++++-- tests/test_replay_integration.py | 57 ++++++++++++- 10 files changed, 419 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 0671664..219522b 100644 --- a/README.md +++ b/README.md @@ -237,7 +237,7 @@ See [Roadmap](ROADMAP.md) for the full plan. Highlights: - [πŸ“œ Behavioral Contracts](docs/BEHAVIORAL_CONTRACTS.md) - Contract Γ— chaos matrix - [πŸ”„ Replay Regression](docs/REPLAY_REGRESSION.md) - Import and replay production failures - [πŸ›‘οΈ Context Attacks](docs/CONTEXT_ATTACKS.md) - Indirect injection, memory poisoning -- [πŸ“ Spec & audit](docs/V2_SPEC.md) - Spec clarifications; [implementation audit](docs/V2_AUDIT.md) - PRD/addendum verification +- [πŸ“ V2 Spec](docs/V2_SPEC.md) - Score formula, reset, Python tools ### For Developers - [πŸ—οΈ Architecture & Modules](docs/MODULES.md) - How the code works diff --git a/docs/BEHAVIORAL_CONTRACTS.md b/docs/BEHAVIORAL_CONTRACTS.md index b0c42b3..a480049 100644 --- a/docs/BEHAVIORAL_CONTRACTS.md +++ b/docs/BEHAVIORAL_CONTRACTS.md @@ -82,6 +82,8 @@ Each entry is a **scenario**: a name plus optional `tool_faults`, `llm_faults`, - **Weights:** critical = 3, high = 2, medium = 1, low = 1. - **Automatic FAIL:** If any invariant with severity `critical` fails in any scenario, the contract is considered failed regardless of the numeric score. +See [V2 Spec](V2_SPEC.md) for the exact formula and matrix isolation (reset) behavior. + --- ## Commands diff --git a/docs/CONFIGURATION_GUIDE.md b/docs/CONFIGURATION_GUIDE.md index 8aec6c9..967949a 100644 --- a/docs/CONFIGURATION_GUIDE.md +++ b/docs/CONFIGURATION_GUIDE.md @@ -45,9 +45,10 @@ With `version: "2.0"` you can add the three **chaos engineering pillars** and a | Block | Purpose | Documentation | |-------|---------|---------------| -| `chaos` | **Environment chaos** β€” Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks). | [Environment Chaos](ENVIRONMENT_CHAOS.md) | +| `chaos` | **Environment chaos** β€” Inject faults into tools, LLMs, and context (timeouts, errors, rate limits, context attacks, **response_drift**). | [Environment Chaos](ENVIRONMENT_CHAOS.md) | | `contract` + `chaos_matrix` | **Behavioral contracts** β€” Named invariants verified across a matrix of chaos scenarios; produces a resilience score. | [Behavioral Contracts](BEHAVIORAL_CONTRACTS.md) | | `replays.sessions` | **Replay regression** β€” Import production failure sessions and replay them as deterministic tests. | [Replay Regression](REPLAY_REGRESSION.md) | +| `replays.sources` | **LangSmith sources** β€” Import from a LangSmith project or by run ID; `auto_import` re-fetches on each run/ci. | [Replay Regression](REPLAY_REGRESSION.md) | | `scoring` | **Unified score** β€” Weights for mutation_robustness, chaos_resilience, contract_compliance, replay_regression (used by `flakestorm ci`). | See [README](../README.md) β€œScores at a glance” | **Context attacks** (chaos on tool/context, not the user prompt) are configured under `chaos.context_attacks`. See [Context Attacks](CONTEXT_ATTACKS.md). diff --git a/docs/ENVIRONMENT_CHAOS.md b/docs/ENVIRONMENT_CHAOS.md index 3574f06..6b604a1 100644 --- a/docs/ENVIRONMENT_CHAOS.md +++ b/docs/ENVIRONMENT_CHAOS.md @@ -110,4 +110,10 @@ chaos: - `high_latency` β€” Delayed responses. - `indirect_injection` β€” Context attack profile (inject into tool/context). -Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. +Profile YAMLs live in `src/flakestorm/chaos/profiles/`. Use with `--chaos-profile NAME`. The **`model_version_drift`** profile exercises the LLM fault type **`response_drift`**. + +--- + +## See also + +- [Context Attacks](CONTEXT_ATTACKS.md) β€” Indirect injection, memory poisoning. diff --git a/docs/REPLAY_REGRESSION.md b/docs/REPLAY_REGRESSION.md index d9993de..bb71fac 100644 --- a/docs/REPLAY_REGRESSION.md +++ b/docs/REPLAY_REGRESSION.md @@ -63,7 +63,7 @@ Flakestorm resolves name first, then path; if not found, replay may fail or fall ## Configuration in flakestorm.yaml -You can define replay sessions inline or by file: +You can define replay sessions inline, by file, or via **LangSmith sources**: ```yaml version: "2.0" @@ -76,9 +76,20 @@ replays: input: "What is the capital of France?" contract: "Research Agent Contract" tool_responses: [] + # LangSmith sources (import by project or run ID; auto_import re-fetches on each run/ci) + sources: + - type: langsmith + project: "my-production-agent" + filter: + status: error # error | warning | all + date_range: last_7_days + min_latency_ms: 5000 + auto_import: true + - type: langsmith_run + run_id: "abc123def456" ``` -When you use `file:`, the session’s `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. +When you use `file:`, the session’s `id`, `input`, and `contract` come from the loaded file. When you use inline `id` and `input`, you must provide them. **`replays.sources`** sessions are merged when running `flakestorm ci` or when `auto_import` is true (project sources). --- @@ -89,9 +100,10 @@ When you use `file:`, the session’s `id`, `input`, and `contract` come from th | `flakestorm replay run path/to/replay.yaml -c flakestorm.yaml` | Run a single replay file. `-c` supplies agent and contract config. | | `flakestorm replay run path/to/dir -c flakestorm.yaml` | Run all replay files in the directory. | | `flakestorm replay export --from-report REPORT.json --output ./replays` | Export failed mutations from a Flakestorm report as replay YAML files. | -| `flakestorm replay import --from-langsmith RUN_ID` | Import a session from LangSmith (requires `flakestorm[langsmith]`). | -| `flakestorm replay import --from-langsmith RUN_ID --run` | Import and run the replay. | -| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all sessions in `replays.sessions`**; reports **replay_regression** (passed/total) and **overall** weighted score. | +| `flakestorm replay run --from-langsmith RUN_ID -c flakestorm.yaml` | Import a single session from LangSmith by run ID (requires `flakestorm[langsmith]`). | +| `flakestorm replay run --from-langsmith RUN_ID --run -o replay.yaml` | Import, optionally write to file, and run the replay. | +| `flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` | Import all runs from a LangSmith project; write one YAML per run. Add `--run` to run after import. | +| `flakestorm ci -c flakestorm.yaml` | Runs mutation, contract, chaos-only, **and all replay sessions** (including `replays.sources` with `auto_import`); reports **replay_regression** and **overall** weighted score. | --- @@ -99,7 +111,8 @@ When you use `file:`, the session’s `id`, `input`, and `contract` come from th - **Manual** β€” Write YAML/JSON replay files from incident reports. - **Flakestorm export** β€” `flakestorm replay export --from-report REPORT.json` turns failed runs into replay files. -- **LangSmith** β€” `flakestorm replay import --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`). +- **LangSmith (single run)** β€” `flakestorm replay run --from-langsmith RUN_ID` (requires `pip install flakestorm[langsmith]`). +- **LangSmith (project)** β€” `flakestorm replay run --from-langsmith-project PROJECT --filter-status error -o ./replays/` imports failed runs from a project; or use `replays.sources` in config with `auto_import: true` so CI re-fetches from the project each run. --- diff --git a/docs/V2_AUDIT.md b/docs/V2_AUDIT.md index 05fe932..aa117b4 100644 --- a/docs/V2_AUDIT.md +++ b/docs/V2_AUDIT.md @@ -68,16 +68,62 @@ Verification of the codebase against the PRD and addendum: behavior, config sche --- -## 6. Addendum β€” Context Attacks, Model Drift, LangSmith, Spec +## 6. Addendum (flakestorm-v2-addendum.md) β€” Full Checklist -| Item | Status | -|------|--------| -| Context attacks module (indirect_injection, etc.) | βœ… `chaos/context_attacks.py`; profile `indirect_injection.yaml` | -| response_drift in llm_proxy | βœ… `chaos/llm_proxy.py` (json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift) | -| LangSmith load + schema check | βœ… `replay/loader.py`: `load_langsmith_run`, `_validate_langsmith_run_schema` | -| Python tool fault: fail loudly when no tools | βœ… `create_instrumented_adapter` raises if type=python and tool_faults | -| Contract matrix isolation (reset) | βœ… Optional reset; warning if stateful and no reset | -| Resilience score formula (addendum Β§6.3) | βœ… In `contracts/matrix.py` and `docs/V2_SPEC.md` | +### Addition 1 β€” Context Attacks Module + +| Requirement | Status | Notes | +|-------------|--------|------| +| `chaos/context_attacks.py` | βœ… | `ContextAttackEngine`, `maybe_inject_indirect()` | +| indirect_injection (inject payloads into tool response) | βœ… | Wired via engine; profile `indirect_injection.yaml` | +| memory_poisoning, system_prompt_leak_probe | ⚠️ | Docstring/config types exist; memory_poisoning inject step and leak probe as contract assertion are not fully wired in execution flow | +| Contract invariants: excludes_pattern, behavior_unchanged | βœ… | `assertions/verifier.py`; use for system_prompt_not_leaked, injection_not_executed | +| Config: `chaos.context_attacks` list with type (e.g. indirect_injection) | βœ… | `ContextAttackConfig` in `core/config.py` | + +### Addition 2 β€” Model Version Drift (response_drift) + +| Requirement | Status | Notes | +|-------------|--------|------| +| `response_drift` in llm_faults | βœ… | `chaos/llm_proxy.py`: `apply_llm_response_drift`, drift_type, severity, direction, factor | +| drift_type: json_field_rename, verbosity_shift, format_change, refusal_rephrase, tone_shift | βœ… | Implemented in llm_proxy | +| Profile `model_version_drift.yaml` | βœ… | `chaos/profiles/model_version_drift.yaml` | + +### Addition 3 β€” Multi-Agent Failure Propagation + +| Requirement | Status | Notes | +|-------------|--------|------| +| v3 roadmap placeholder, no v2 implementation | βœ… | Documented in ROADMAP.md as V3; no code required | + +### Addition 4 β€” Resilience Certificate Export + +| Requirement | Status | Notes | +|-------------|--------|------| +| `flakestorm certificate` CLI command | ❌ | Not implemented | +| `reports/certificate.py` (PDF/HTML certificate) | ❌ | Not implemented | +| Config `certificate.tester_name`, pass_threshold, output_format | ❌ | Not implemented | + +### Addition 5 β€” LangSmith Replay Import + +| Requirement | Status | Notes | +|-------------|--------|------| +| Import single run by ID: `flakestorm replay --from-langsmith RUN_ID` | βœ… | `replay/loader.py`: `load_langsmith_run(run_id)`; CLI option | +| Import and run: `--from-langsmith RUN_ID --run` | βœ… | `_replay_async` supports run_after_import | +| Schema validation (fail clearly if LangSmith API changed) | βœ… | `_validate_langsmith_run_schema` | +| Map run inputs/outputs/child_runs to ReplaySessionConfig | βœ… | `_langsmith_run_to_session` | +| `--from-langsmith-project PROJECT` + `--filter-status` + `--output` | βœ… | `replay run --from-langsmith-project X --filter-status error -o ./replays/`; writes YAML per run | +| `replays.sources` (type: langsmith | langsmith_run, project, filter, auto_import) | βœ… | `LangSmithProjectSourceConfig`, `LangSmithRunSourceConfig`, `ReplayConfig.sources`; CI uses `resolve_sessions_from_config(..., include_sources=True)` | + +### Addition 6 β€” Implicit Spec Clarifications + +| Requirement | Status | Notes | +|-------------|--------|------| +| 6.1 Python callables: fail loudly if tool_faults but no tools/ToolRegistry | βœ… | `create_instrumented_adapter` raises with clear message for type=python | +| 6.2 Contract matrix: reset between cells (reset_endpoint / reset_function) | βœ… | `ContractEngine._reset_agent()`; config fields on AgentConfig | +| 6.3 Resilience score formula in spec (weighted, auto-FAIL on critical) | βœ… | `contracts/matrix.py` docstring and implementation; `docs/V2_SPEC.md` | + +--- + +**Summary:** Addendum Additions 1, 2, 3, 5, 6 are implemented (with minor gaps on full memory_poisoning/leak_probe wiring). **Addition 4 (Resilience Certificate)** is not implemented. --- diff --git a/src/flakestorm/cli/main.py b/src/flakestorm/cli/main.py index 84fb062..897cc8b 100644 --- a/src/flakestorm/cli/main.py +++ b/src/flakestorm/cli/main.py @@ -552,10 +552,31 @@ def replay_run( help="Path to configuration file", ), from_langsmith: str | None = typer.Option(None, "--from-langsmith", help="LangSmith run ID"), - run_after_import: bool = typer.Option(False, "--run", help="Run replay after import"), + from_langsmith_project: str | None = typer.Option( + None, + "--from-langsmith-project", + help="Import runs from a LangSmith project (filter by status, then write to --output)", + ), + filter_status: str = typer.Option( + "error", + "--filter-status", + help="When using --from-langsmith-project: error | warning | all", + ), + output: Path = typer.Option( + None, + "--output", + "-o", + help="When importing: output file (single run) or directory (project); replays written as YAML", + ), + run_after_import: bool = typer.Option(False, "--run", help="Run replay(s) after import"), ) -> None: """Run or import replay sessions.""" - asyncio.run(_replay_async(path, config, from_langsmith, run_after_import)) + asyncio.run( + _replay_async( + path, config, from_langsmith, from_langsmith_project, + filter_status, output, run_after_import, + ) + ) @replay_app.command("export") @@ -602,8 +623,12 @@ async def _replay_async( path: Path | None, config: Path, from_langsmith: str | None, + from_langsmith_project: str | None, + filter_status: str, + output: Path | None, run_after_import: bool, ) -> None: + import yaml from flakestorm.core.config import load_config from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter from flakestorm.replay.loader import ReplayLoader, resolve_contract @@ -612,10 +637,60 @@ async def _replay_async( agent = create_agent_adapter(cfg.agent) if cfg.chaos: agent = create_instrumented_adapter(agent, cfg.chaos) + loader = ReplayLoader() + + if from_langsmith_project: + sessions = loader.load_langsmith_project( + project_name=from_langsmith_project, + filter_status=filter_status, + ) + console.print(f"[green]Imported {len(sessions)} replay(s) from LangSmith project.[/green]") + out_path = Path(output) if output else Path("./replays") + out_path.mkdir(parents=True, exist_ok=True) + for i, session in enumerate(sessions): + safe_id = (session.id or str(i)).replace("/", "_").replace("\\", "_")[:64] + fpath = out_path / f"replay-{safe_id}.yaml" + fpath.write_text( + yaml.dump( + session.model_dump(mode="json", exclude_none=True), + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ), + encoding="utf-8", + ) + console.print(f" [dim]Wrote[/dim] {fpath}") + if run_after_import and sessions: + contract = None + try: + contract = resolve_contract(sessions[0].contract, cfg, config.parent) + except FileNotFoundError: + pass + runner = ReplayRunner(agent, contract=contract) + passed = 0 + for session in sessions: + result = await runner.run(session, contract=contract) + if result.passed: + passed += 1 + console.print(f"[bold]Replay results:[/bold] {passed}/{len(sessions)} passed") + raise typer.Exit(0) + if from_langsmith: - loader = ReplayLoader() session = loader.load_langsmith_run(from_langsmith) console.print(f"[green]Imported replay:[/green] {session.id}") + if output: + out_path = Path(output) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text( + yaml.dump( + session.model_dump(mode="json", exclude_none=True), + default_flow_style=False, + sort_keys=False, + allow_unicode=True, + ), + encoding="utf-8", + ) + console.print(f"[dim]Wrote[/dim] {out_path}") if run_after_import: contract = None try: @@ -627,8 +702,8 @@ async def _replay_async( console.print(f"[bold]Replay result:[/bold] passed={replay_result.passed}") console.print(f"[dim]Response:[/dim] {(replay_result.response.output or '')[:200]}...") raise typer.Exit(0) + if path and path.exists(): - loader = ReplayLoader() session = loader.load_file(path) contract = None try: @@ -641,7 +716,9 @@ async def _replay_async( if replay_result.verification_details: console.print(f"[dim]Checks:[/dim] {', '.join(replay_result.verification_details)}") else: - console.print("[yellow]Provide a replay file path or --from-langsmith RUN_ID.[/yellow]") + console.print( + "[yellow]Provide a replay file path, --from-langsmith RUN_ID, or --from-langsmith-project PROJECT.[/yellow]" + ) @app.command() @@ -703,42 +780,38 @@ async def _ci_async(config: Path, min_score: float) -> None: if chaos_score < min_score: exit_code = 1 - # Replay sessions + # Replay sessions (from replays.sessions and replays.sources with auto_import) replay_score = 1.0 - if cfg.replays and cfg.replays.sessions: + if cfg.replays and (cfg.replays.sessions or cfg.replays.sources): from flakestorm.core.protocol import create_agent_adapter, create_instrumented_adapter - from flakestorm.replay.loader import ReplayLoader, resolve_contract + from flakestorm.replay.loader import resolve_contract, resolve_sessions_from_config from flakestorm.replay.runner import ReplayRunner agent = create_agent_adapter(cfg.agent) if cfg.chaos: agent = create_instrumented_adapter(agent, cfg.chaos) - loader = ReplayLoader() - passed = 0 - total = 0 config_path = Path(config) - for s in cfg.replays.sessions: - if getattr(s, "file", None): - fpath = Path(s.file) - if not fpath.is_absolute(): - fpath = config_path.parent / fpath - session = loader.load_file(fpath) - else: - session = s - contract = None - try: - contract = resolve_contract(session.contract, cfg, config_path.parent) - except FileNotFoundError: - pass - runner = ReplayRunner(agent, contract=contract) - result = await runner.run(session, contract=contract) - total += 1 - if result.passed: - passed += 1 - replay_score = passed / total if total else 1.0 - scores["replay_regression"] = replay_score - console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})") - if replay_score < min_score: - exit_code = 1 + sessions = resolve_sessions_from_config( + cfg.replays, config_path.parent, include_sources=True + ) + if sessions: + passed = 0 + total = 0 + for session in sessions: + contract = None + try: + contract = resolve_contract(session.contract, cfg, config_path.parent) + except FileNotFoundError: + pass + runner = ReplayRunner(agent, contract=contract) + result = await runner.run(session, contract=contract) + total += 1 + if result.passed: + passed += 1 + replay_score = passed / total if total else 1.0 + scores["replay_regression"] = replay_score + console.print(f"[bold]Replay score:[/bold] {replay_score:.1%} ({passed}/{total})") + if replay_score < min_score: + exit_code = 1 # Overall weighted score (only for components that ran) from flakestorm.core.config import ScoringConfig diff --git a/src/flakestorm/core/config.py b/src/flakestorm/core/config.py index 60157d8..a86e50a 100644 --- a/src/flakestorm/core/config.py +++ b/src/flakestorm/core/config.py @@ -11,6 +11,7 @@ import os import re from enum import Enum from pathlib import Path +from typing import Annotated, Literal, Union import yaml from pydantic import BaseModel, Field, field_validator, model_validator @@ -534,10 +535,59 @@ class ReplaySessionConfig(BaseModel): return self +class LangSmithProjectFilterConfig(BaseModel): + """Filter for LangSmith project run listing (replays.sources).""" + + status: str = Field( + default="error", + description="Filter by run status: error | warning | all", + ) + date_range: str | None = Field( + default=None, + description="e.g. last_7_days (used as start_time relative to now)", + ) + min_latency_ms: int | None = Field( + default=None, + description="Include runs with latency >= this many ms", + ) + + +class LangSmithProjectSourceConfig(BaseModel): + """Replay source: import runs from a LangSmith project (replays.sources).""" + + type: Literal["langsmith"] = "langsmith" + project: str = Field(..., description="LangSmith project name") + filter: LangSmithProjectFilterConfig | None = Field( + default=None, + description="Optional filter (status, date_range, min_latency_ms)", + ) + auto_import: bool = Field( + default=False, + description="If true, (re-)fetch runs from project on each run/ci", + ) + + +class LangSmithRunSourceConfig(BaseModel): + """Replay source: single LangSmith run by ID (replays.sources).""" + + type: Literal["langsmith_run"] = "langsmith_run" + run_id: str = Field(..., description="LangSmith run ID") + + +ReplaySourceConfig = Annotated[ + Union[LangSmithProjectSourceConfig, LangSmithRunSourceConfig], + Field(discriminator="type"), +] + + class ReplayConfig(BaseModel): """V2 replay regression configuration.""" sessions: list[ReplaySessionConfig] = Field(default_factory=list) + sources: list[ReplaySourceConfig] = Field( + default_factory=list, + description="Optional LangSmith sources (project or run_id); sessions from sources can be merged when auto_import is true", + ) class FlakeStormConfig(BaseModel): diff --git a/src/flakestorm/replay/loader.py b/src/flakestorm/replay/loader.py index e1c293f..5b54f7f 100644 --- a/src/flakestorm/replay/loader.py +++ b/src/flakestorm/replay/loader.py @@ -2,17 +2,26 @@ Replay loader: load replay sessions from YAML/JSON or LangSmith. Contract reference resolution: by name (main config) then by file path. +LangSmith: single run by ID or project listing with filters (Addition 5). """ from __future__ import annotations import json +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Any import yaml -from flakestorm.core.config import ContractConfig, ReplaySessionConfig +from flakestorm.core.config import ( + ContractConfig, + LangSmithProjectFilterConfig, + LangSmithProjectSourceConfig, + LangSmithRunSourceConfig, + ReplayConfig, + ReplaySessionConfig, +) if TYPE_CHECKING: from flakestorm.core.config import FlakeStormConfig @@ -58,23 +67,82 @@ class ReplayLoader: data = yaml.safe_load(text) return ReplaySessionConfig.model_validate(data) + def _get_langsmith_client(self) -> Any: + """Return LangSmith Client; raise ImportError if langsmith not installed.""" + try: + from langsmith import Client + except ImportError as e: + raise ImportError( + "LangSmith requires: pip install flakestorm[langsmith] or pip install langsmith" + ) from e + return Client() + def load_langsmith_run(self, run_id: str) -> ReplaySessionConfig: """ Load a LangSmith run as a replay session. Requires langsmith>=0.1.0. Target API: /api/v1/runs/{run_id} Fails clearly if LangSmith schema has changed (expected fields missing). """ - try: - from langsmith import Client - except ImportError as e: - raise ImportError( - "LangSmith import requires: pip install flakestorm[langsmith] or pip install langsmith" - ) from e - client = Client() + client = self._get_langsmith_client() run = client.read_run(run_id) self._validate_langsmith_run_schema(run) return self._langsmith_run_to_session(run) + def load_langsmith_project( + self, + project_name: str, + filter_status: str = "error", + date_range: str | None = None, + min_latency_ms: int | None = None, + limit: int = 200, + ) -> list[ReplaySessionConfig]: + """ + Load runs from a LangSmith project as replay sessions. Requires langsmith>=0.1.0. + Uses list_runs(project_name=..., error=..., start_time=..., filter=..., limit=...). + Each run is fetched fully (read_run) to get child_runs for tool_responses. + """ + client = self._get_langsmith_client() + # Build list_runs kwargs + error_filter: bool | None = None + if filter_status == "error": + error_filter = True + elif filter_status == "all": + error_filter = None + else: + # "warning" or unknown: treat as non-error runs + error_filter = False + start_time: datetime | None = None + if date_range: + date_range_lower = date_range.strip().lower().replace("-", "_") + if "7" in date_range_lower and "day" in date_range_lower: + start_time = datetime.now(timezone.utc) - timedelta(days=7) + elif "24" in date_range_lower and ("hour" in date_range_lower or "day" in date_range_lower): + start_time = datetime.now(timezone.utc) - timedelta(hours=24) + elif "30" in date_range_lower and "day" in date_range_lower: + start_time = datetime.now(timezone.utc) - timedelta(days=30) + filter_str: str | None = None + if min_latency_ms is not None and min_latency_ms > 0: + # LangSmith filter uses seconds for latency + latency_sec = min_latency_ms / 1000.0 + filter_str = f"gt(latency, {latency_sec})" + runs_iterator = client.list_runs( + project_name=project_name, + error=error_filter, + start_time=start_time, + filter=filter_str, + limit=limit, + is_root=True, + ) + sessions: list[ReplaySessionConfig] = [] + for run in runs_iterator: + run_id = str(getattr(run, "id", "")) + if not run_id: + continue + full_run = client.read_run(run_id) + self._validate_langsmith_run_schema(full_run) + sessions.append(self._langsmith_run_to_session(full_run)) + return sessions + def _validate_langsmith_run_schema(self, run: Any) -> None: """Check that run has expected schema; fail clearly if LangSmith API changed.""" required = ("id", "inputs", "outputs") @@ -112,3 +180,47 @@ class ReplayLoader: tool_responses=tool_responses, contract="default", ) + + +def resolve_sessions_from_config( + replays: ReplayConfig | None, + config_dir: Path | None = None, + *, + include_sources: bool = True, +) -> list[ReplaySessionConfig]: + """ + Build full list of replay sessions from config: inline sessions, file-backed + sessions (loaded from disk), and optionally sessions from replays.sources + (LangSmith run_id or project with auto_import). + """ + if not replays: + return [] + loader = ReplayLoader() + out: list[ReplaySessionConfig] = [] + for s in replays.sessions: + if s.file: + path = Path(s.file) + if not path.is_absolute() and config_dir: + path = config_dir / path + out.append(loader.load_file(path)) + else: + out.append(s) + if not include_sources or not replays.sources: + return out + for src in replays.sources: + if isinstance(src, LangSmithRunSourceConfig): + out.append(loader.load_langsmith_run(src.run_id)) + elif isinstance(src, LangSmithProjectSourceConfig) and src.auto_import: + filt = src.filter + filter_status = filt.status if filt else "error" + date_range = filt.date_range if filt else None + min_latency_ms = filt.min_latency_ms if filt else None + out.extend( + loader.load_langsmith_project( + project_name=src.project, + filter_status=filter_status, + date_range=date_range, + min_latency_ms=min_latency_ms, + ) + ) + return out diff --git a/tests/test_replay_integration.py b/tests/test_replay_integration.py index b4b7b5a..f84d9f4 100644 --- a/tests/test_replay_integration.py +++ b/tests/test_replay_integration.py @@ -20,10 +20,11 @@ from flakestorm.core.config import ( AdvancedConfig, ContractConfig, ContractInvariantConfig, + ReplayConfig, ReplaySessionConfig, ReplayToolResponseConfig, ) -from flakestorm.replay.loader import ReplayLoader, resolve_contract +from flakestorm.replay.loader import ReplayLoader, resolve_contract, resolve_sessions_from_config from flakestorm.replay.runner import ReplayRunner, ReplayResult from flakestorm.core.protocol import AgentResponse, BaseAgentAdapter @@ -99,6 +100,60 @@ class TestReplayLoader: with pytest.raises(FileNotFoundError): resolve_contract("nonexistent", config, None) + def test_resolve_sessions_from_config_inline_only(self): + """resolve_sessions_from_config returns inline sessions when no sources.""" + replays = ReplayConfig( + sessions=[ + ReplaySessionConfig(id="a", input="q1", contract="default"), + ReplaySessionConfig(id="b", input="q2", contract="default"), + ], + sources=[], + ) + out = resolve_sessions_from_config(replays, None, include_sources=True) + assert len(out) == 2 + assert out[0].id == "a" + assert out[1].id == "b" + + def test_resolve_sessions_from_config_file_backed(self): + """resolve_sessions_from_config loads file-backed sessions from config_dir.""" + with tempfile.NamedTemporaryFile( + suffix=".yaml", delete=False, mode="w", encoding="utf-8" + ) as f: + yaml.dump({ + "id": "file-session", + "input": "from file", + "tool_responses": [], + "contract": "default", + }, f) + f.flush() + fpath = Path(f.name) + try: + config_dir = fpath.parent + replays = ReplayConfig( + sessions=[ReplaySessionConfig(id="", input="", file=fpath.name)], + sources=[], + ) + out = resolve_sessions_from_config(replays, config_dir, include_sources=True) + assert len(out) == 1 + assert out[0].id == "file-session" + assert out[0].input == "from file" + finally: + fpath.unlink(missing_ok=True) + + def test_replay_config_sources_parsed_from_dict(self): + """ReplayConfig.sources parses langsmith and langsmith_run from dict (YAML).""" + cfg = ReplayConfig.model_validate({ + "sessions": [], + "sources": [ + {"type": "langsmith", "project": "my-agent", "auto_import": True}, + {"type": "langsmith_run", "run_id": "abc-123"}, + ], + }) + assert len(cfg.sources) == 2 + assert cfg.sources[0].project == "my-agent" + assert cfg.sources[0].auto_import is True + assert cfg.sources[1].run_id == "abc-123" + class TestReplayRunner: """Test replay runner and verification."""