mirror of
https://github.com/katanemo/plano.git
synced 2026-06-08 14:55:14 +02:00
demos: add peyeeye PII redaction & rehydration filter chain
Mirrors the pii_anonymizer demo: a FastAPI input/output filter pair that delegates PII detection to the Peyeeye API (https://api.peyeeye.ai). - /redact/{path:path} redacts user text in chat completions, Anthropic messages, and Responses-API request bodies. - /rehydrate/{path:path} restores placeholders in the LLM response, then best-effort deletes the stateful session. Fail-closed semantics: any unexpected response shape from /v1/redact, or a length mismatch between sent and returned text counts, raises a 502 rather than forwarding partially-redacted data. Auth errors map to 500. Stateful (default) and stateless (sealed skey_... blob) session modes are both supported via PEYEEYE_SESSION_MODE. Includes 24 pytest cases covering the round trip, length-guard, unexpected-shape guard, multimodal content lists, Anthropic and Responses formats, and stateless mode.
This commit is contained in:
parent
938f9c4bdf
commit
e448479d3b
9 changed files with 1296 additions and 0 deletions
22
demos/filter_chains/peyeeye/Dockerfile
Normal file
22
demos/filter_chains/peyeeye/Dockerfile
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
FROM python:3.12-slim
|
||||
|
||||
ENV PYTHONUNBUFFERED=1 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
PIP_NO_CACHE_DIR=1
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN pip install --no-cache-dir uv
|
||||
|
||||
COPY pyproject.toml ./
|
||||
RUN uv pip install --system --no-cache-dir \
|
||||
"fastapi>=0.104.1" \
|
||||
"uvicorn>=0.24.0" \
|
||||
"httpx>=0.27.0" \
|
||||
"pydantic>=2.0.0"
|
||||
|
||||
COPY peyeeye.py ./
|
||||
|
||||
EXPOSE 10502
|
||||
|
||||
CMD ["uvicorn", "peyeeye:app", "--host", "0.0.0.0", "--port", "10502"]
|
||||
103
demos/filter_chains/peyeeye/README.md
Normal file
103
demos/filter_chains/peyeeye/README.md
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
# Peyeeye PII Filter Chain Demo
|
||||
|
||||
Drop-in PII redaction + rehydration for Plano via the [Peyeeye](https://peyeeye.ai) API.
|
||||
|
||||
The model never sees raw PII — incoming text is sent to `/v1/redact` and replaced with stable placeholders like `[EMAIL_0]`, `[PERSON_1]`, etc. After the model responds, the placeholders in its output are swapped back to the originals via `/v1/rehydrate`.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
Client --> Plano (model listener :12000)
|
||||
|
|
||||
+-- input_filters: peyeeye_redact
|
||||
| -> POST https://api.peyeeye.ai/v1/redact
|
||||
| -> body messages contain [EMAIL_0], [SSN_0], ...
|
||||
|
|
||||
+-- model_provider: openai/gpt-4o-mini (or claude, etc.)
|
||||
| -> the LLM only ever sees redacted text
|
||||
|
|
||||
+-- output_filters: peyeeye_rehydrate
|
||||
-> POST https://api.peyeeye.ai/v1/rehydrate
|
||||
-> placeholders restored to originals
|
||||
```
|
||||
|
||||
## Quick start
|
||||
|
||||
```bash
|
||||
export PEYEEYE_API_KEY=pk_live_... # https://peyeeye.ai
|
||||
export OPENAI_API_KEY=sk-...
|
||||
|
||||
bash run_demo.sh
|
||||
|
||||
# in another terminal
|
||||
bash test.sh
|
||||
|
||||
# stop
|
||||
bash run_demo.sh down
|
||||
```
|
||||
|
||||
## Try it
|
||||
|
||||
```bash
|
||||
curl http://localhost:12000/v1/chat/completions \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "Email me at jane@example.com about SSN 123-45-6789"}],
|
||||
"stream": false
|
||||
}'
|
||||
```
|
||||
|
||||
The response body comes back with the original email and SSN restored, while the request that hit OpenAI carried `[EMAIL_0]` and `[SSN_0]`.
|
||||
|
||||
## Configuration
|
||||
|
||||
All configuration is via env vars on the filter service:
|
||||
|
||||
| Var | Default | Notes |
|
||||
|---|---|---|
|
||||
| `PEYEEYE_API_KEY` | _required_ | get one at peyeeye.ai |
|
||||
| `PEYEEYE_API_BASE` | `https://api.peyeeye.ai` | override for self-hosted |
|
||||
| `PEYEEYE_LOCALE` | `auto` | BCP-47 |
|
||||
| `PEYEEYE_ENTITIES` | _all_ | comma-separated list, e.g. `EMAIL,SSN,CREDIT_CARD` |
|
||||
| `PEYEEYE_SESSION_MODE` | `stateful` | `stateful` (default) or `stateless` |
|
||||
|
||||
In `stateless` mode, Peyeeye returns a sealed `skey_...` blob instead of holding the mapping server-side; this filter caches the blob on the request id and uses it for rehydration. No per-request state is retained on Peyeeye's servers.
|
||||
|
||||
## Filter contract
|
||||
|
||||
**Input filter (`/redact/{path:path}`)** receives the full raw request body. It walks `messages[].content` (string or multimodal `text` parts) for chat-style endpoints and `input` for the OpenAI Responses API, sends a single batched call to Peyeeye, and writes the redacted text back into the body in place.
|
||||
|
||||
**Output filter (`/rehydrate/{path:path}`)** receives the raw LLM response bytes, looks up the cached session id by the request id (`x-request-id`), and rehydrates `choices[].message.content`, Anthropic-style `content[].text`, or Responses-API `output[].content[].text`.
|
||||
|
||||
## Behavioral invariants
|
||||
|
||||
- **Fail-closed.** If `/v1/redact` returns an unexpected shape, or the count of returned texts doesn't match the count sent, the filter raises a 502 — no unredacted text is ever forwarded to the model.
|
||||
- **Length-guard.** `len(redacted) == len(sent)` is asserted before zipping the values back into the request.
|
||||
- **Typed errors.** `PEyeEyeMissingSecrets` covers auth (401, missing key), `PEyeEyeAPIError` covers everything else (timeouts, 4xx, 5xx, parse). Both surface as HTTP errors to Plano.
|
||||
- **Best-effort cleanup.** Stateful sessions are deleted server-side after rehydration via `DELETE /v1/sessions/{ses_...}`.
|
||||
|
||||
## Streaming
|
||||
|
||||
Streaming SSE responses are passed through unchanged in this demo — token-by-token rehydration would require buffering or a session-aware streaming endpoint. For now, set `stream: false` on requests routed through this filter chain.
|
||||
|
||||
## Tests
|
||||
|
||||
```bash
|
||||
uv sync --group dev
|
||||
uv run pytest -v
|
||||
```
|
||||
|
||||
The suite mocks the Peyeeye API (`respx`) and exercises:
|
||||
|
||||
- redact + rehydrate round trip on chat completions
|
||||
- redact + rehydrate on `/v1/messages` (Anthropic) and `/v1/responses` (OpenAI)
|
||||
- the length-guard (redact returns wrong count -> 502)
|
||||
- the unexpected-shape guard (redact returns non-string/list -> 502)
|
||||
- the no-PII passthrough (no redactable text -> body unchanged, no session cached)
|
||||
- the no-cached-session passthrough on rehydrate
|
||||
- multimodal `[{"type":"text","text":...}]` content lists
|
||||
|
||||
## Disclosure
|
||||
|
||||
I'm the maintainer of peyeeye.ai. Happy to adjust API surface, naming, or test coverage to match Plano's conventions.
|
||||
42
demos/filter_chains/peyeeye/config.yaml
Normal file
42
demos/filter_chains/peyeeye/config.yaml
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
version: v0.3.0
|
||||
|
||||
# Peyeeye PII redaction & rehydration filter chain.
|
||||
#
|
||||
# Prereqs:
|
||||
# * export PEYEEYE_API_KEY=... (required)
|
||||
# * export OPENAI_API_KEY=... (or any provider you wire up below)
|
||||
#
|
||||
# The peyeeye filter service runs on :10502 and exposes:
|
||||
# POST /redact/{path} — input filter
|
||||
# POST /rehydrate/{path} — output filter
|
||||
#
|
||||
# Plano appends the upstream path automatically (e.g. /v1/chat/completions),
|
||||
# so /redact/v1/chat/completions hits the redactor, and the model only ever
|
||||
# sees ``[EMAIL_0]``-style placeholders.
|
||||
|
||||
filters:
|
||||
- id: peyeeye_redact
|
||||
url: http://localhost:10502/redact
|
||||
type: http
|
||||
- id: peyeeye_rehydrate
|
||||
url: http://localhost:10502/rehydrate
|
||||
type: http
|
||||
|
||||
model_providers:
|
||||
- model: openai/gpt-4o-mini
|
||||
access_key: $OPENAI_API_KEY
|
||||
default: true
|
||||
- model: anthropic/claude-sonnet-4-20250514
|
||||
access_key: $ANTHROPIC_API_KEY
|
||||
|
||||
listeners:
|
||||
- type: model
|
||||
name: llm_gateway
|
||||
port: 12000
|
||||
input_filters:
|
||||
- peyeeye_redact
|
||||
output_filters:
|
||||
- peyeeye_rehydrate
|
||||
|
||||
tracing:
|
||||
random_sampling: 100
|
||||
485
demos/filter_chains/peyeeye/peyeeye.py
Normal file
485
demos/filter_chains/peyeeye/peyeeye.py
Normal file
|
|
@ -0,0 +1,485 @@
|
|||
"""Peyeeye PII redaction & rehydration filter for Plano filter chains.
|
||||
|
||||
Two endpoints, mirroring the pii_anonymizer demo:
|
||||
|
||||
POST /redact/{path:path} — input filter; redacts PII before the LLM call.
|
||||
POST /rehydrate/{path:path} — output filter; restores PII in the LLM response.
|
||||
|
||||
The filter delegates detection and rehydration to the Peyeeye API
|
||||
(``https://api.peyeeye.ai`` by default). Two session modes are supported:
|
||||
|
||||
* ``stateful`` (default) — Peyeeye holds the token -> value mapping under a
|
||||
``ses_...`` id; rehydrate references the id.
|
||||
* ``stateless`` — Peyeeye returns a sealed ``skey_...`` blob; nothing is
|
||||
retained server-side.
|
||||
|
||||
Behavioral invariants (mirrored from the LiteLLM peyeeye guardrail):
|
||||
|
||||
* Pre-call: redact every text-bearing chunk in the request. If the count of
|
||||
returned texts doesn't match the count sent, raise -- never forward
|
||||
partially-redacted source.
|
||||
* Post-call: pull the cached session id, rehydrate the response, and best-
|
||||
effort delete the stateful session.
|
||||
* Fail-closed: any unexpected response shape from /v1/redact raises a typed
|
||||
error rather than silently passing PII through.
|
||||
|
||||
Configuration knobs (env vars):
|
||||
|
||||
* ``PEYEEYE_API_KEY`` — required.
|
||||
* ``PEYEEYE_API_BASE`` — defaults to ``https://api.peyeeye.ai``.
|
||||
* ``PEYEEYE_LOCALE`` — BCP-47, default ``auto``.
|
||||
* ``PEYEEYE_ENTITIES`` — comma-separated entity ids to restrict detection.
|
||||
* ``PEYEEYE_SESSION_MODE`` — ``stateful`` (default) or ``stateless``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import httpx
|
||||
from fastapi import FastAPI, HTTPException, Request
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - [PEYEEYE] - %(levelname)s - %(message)s",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_API_BASE = "https://api.peyeeye.ai"
|
||||
SESSION_TTL_SECONDS = 3600
|
||||
HTTP_TIMEOUT_SECONDS = 15.0
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------- errors
|
||||
|
||||
|
||||
class PEyeEyeAPIError(Exception):
|
||||
"""Raised when the Peyeeye API returns an error or unexpected payload."""
|
||||
|
||||
|
||||
class PEyeEyeMissingSecrets(Exception):
|
||||
"""Raised when no Peyeeye API key is configured."""
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- mini cache
|
||||
|
||||
|
||||
class _SessionCache:
|
||||
"""Tiny in-memory ``request_id -> session_id`` store with TTL."""
|
||||
|
||||
def __init__(self, ttl_seconds: int = SESSION_TTL_SECONDS) -> None:
|
||||
self._ttl = ttl_seconds
|
||||
self._lock = threading.Lock()
|
||||
self._store: Dict[str, Tuple[str, float]] = {}
|
||||
|
||||
def _expire(self) -> None:
|
||||
now = time.time()
|
||||
expired = [k for k, (_, ts) in self._store.items() if now - ts > self._ttl]
|
||||
for k in expired:
|
||||
del self._store[k]
|
||||
|
||||
def set(self, key: str, value: str) -> None:
|
||||
with self._lock:
|
||||
self._expire()
|
||||
self._store[key] = (value, time.time())
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
with self._lock:
|
||||
entry = self._store.get(key)
|
||||
return entry[0] if entry else None
|
||||
|
||||
def pop(self, key: str) -> Optional[str]:
|
||||
with self._lock:
|
||||
entry = self._store.pop(key, None)
|
||||
return entry[0] if entry else None
|
||||
|
||||
|
||||
# ------------------------------------------------------------------ peyeeye client
|
||||
|
||||
|
||||
class PEyeEyeClient:
|
||||
"""Async HTTP client for the Peyeeye redact/rehydrate API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
api_key: Optional[str] = None,
|
||||
api_base: Optional[str] = None,
|
||||
locale: Optional[str] = None,
|
||||
entities: Optional[List[str]] = None,
|
||||
session_mode: Optional[str] = None,
|
||||
) -> None:
|
||||
key = api_key or os.environ.get("PEYEEYE_API_KEY")
|
||||
if not key:
|
||||
raise PEyeEyeMissingSecrets(
|
||||
"Peyeeye API key missing — set the PEYEEYE_API_KEY env var."
|
||||
)
|
||||
self.api_key = key
|
||||
self.api_base = (
|
||||
api_base or os.environ.get("PEYEEYE_API_BASE") or DEFAULT_API_BASE
|
||||
).rstrip("/")
|
||||
self.locale = locale or os.environ.get("PEYEEYE_LOCALE") or "auto"
|
||||
env_entities = os.environ.get("PEYEEYE_ENTITIES")
|
||||
if entities is None and env_entities:
|
||||
entities = [e.strip() for e in env_entities.split(",") if e.strip()]
|
||||
self.entities = entities or None
|
||||
mode = session_mode or os.environ.get("PEYEEYE_SESSION_MODE") or "stateful"
|
||||
if mode not in ("stateful", "stateless"):
|
||||
raise ValueError(
|
||||
f"PEYEEYE_SESSION_MODE must be 'stateful' or 'stateless', got {mode!r}"
|
||||
)
|
||||
self.session_mode = mode
|
||||
self._client: Optional[httpx.AsyncClient] = None
|
||||
|
||||
async def _async_client(self) -> httpx.AsyncClient:
|
||||
if self._client is None:
|
||||
self._client = httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS)
|
||||
return self._client
|
||||
|
||||
def _headers(self) -> Dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
async def redact_batch(self, texts: List[str]) -> Tuple[List[str], Optional[str]]:
|
||||
"""Redact a batch of texts. Returns ``(redacted, session_id_or_skey)``.
|
||||
|
||||
Raises ``PEyeEyeAPIError`` on any non-2xx, timeout, or unexpected shape.
|
||||
"""
|
||||
body: Dict[str, Any] = {"text": texts, "locale": self.locale}
|
||||
if self.entities:
|
||||
body["entities"] = list(self.entities)
|
||||
if self.session_mode == "stateless":
|
||||
body["session"] = "stateless"
|
||||
|
||||
payload = await self._post("/v1/redact", body)
|
||||
out = payload.get("text")
|
||||
if isinstance(out, str):
|
||||
redacted = [out]
|
||||
elif isinstance(out, list):
|
||||
redacted = [str(x) for x in out]
|
||||
else:
|
||||
raise PEyeEyeAPIError(
|
||||
"Peyeeye /v1/redact returned an unexpected response shape; "
|
||||
"refusing to forward unredacted text."
|
||||
)
|
||||
|
||||
if self.session_mode == "stateless":
|
||||
session_id = payload.get("rehydration_key")
|
||||
else:
|
||||
session_id = payload.get("session_id") or payload.get("session")
|
||||
|
||||
return redacted, session_id
|
||||
|
||||
async def rehydrate(self, text: str, session_id: str) -> str:
|
||||
if not text:
|
||||
return text
|
||||
try:
|
||||
payload = await self._post(
|
||||
"/v1/rehydrate", {"text": text, "session": session_id}
|
||||
)
|
||||
except PEyeEyeAPIError as e:
|
||||
# Rehydration failures must not corrupt or drop the LLM response;
|
||||
# log and fall back to the (already-redacted) text.
|
||||
logger.warning("rehydrate failed: %s", e)
|
||||
return text
|
||||
return payload.get("text", text)
|
||||
|
||||
async def delete_session(self, session_id: str) -> None:
|
||||
if not session_id.startswith("ses_"):
|
||||
return
|
||||
client = await self._async_client()
|
||||
try:
|
||||
await client.delete(
|
||||
f"{self.api_base}/v1/sessions/{session_id}",
|
||||
headers=self._headers(),
|
||||
timeout=10.0,
|
||||
)
|
||||
except Exception as e: # pragma: no cover - best effort
|
||||
logger.debug("session cleanup failed: %s", e)
|
||||
|
||||
async def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
client = await self._async_client()
|
||||
url = f"{self.api_base}{path}"
|
||||
try:
|
||||
resp = await client.post(url, headers=self._headers(), json=body)
|
||||
except httpx.TimeoutException as e:
|
||||
raise PEyeEyeAPIError(f"Peyeeye {path} timed out") from e
|
||||
except httpx.HTTPError as e:
|
||||
raise PEyeEyeAPIError(f"Peyeeye {path} request failed: {e}") from e
|
||||
if resp.status_code == 401:
|
||||
raise PEyeEyeMissingSecrets("Invalid Peyeeye API key") from None
|
||||
if resp.status_code >= 400:
|
||||
raise PEyeEyeAPIError(
|
||||
f"Peyeeye {path} returned HTTP {resp.status_code}: {resp.text[:200]}"
|
||||
)
|
||||
try:
|
||||
return resp.json()
|
||||
except json.JSONDecodeError as e:
|
||||
raise PEyeEyeAPIError(f"Peyeeye {path} returned non-JSON body") from e
|
||||
|
||||
|
||||
# ------------------------------------------------------------- request walkers
|
||||
|
||||
|
||||
def iter_request_texts(body: Dict[str, Any], endpoint: str) -> List[Tuple[str, ...]]:
|
||||
"""Yield ``("path", ...)`` tuples identifying every user-text chunk.
|
||||
|
||||
Mirrors the litellm pre-call hook: walks ``messages[].content`` (string or
|
||||
multimodal text-part list) for chat-style endpoints, and ``input`` for the
|
||||
OpenAI Responses API.
|
||||
|
||||
Returns a list of (path-tuple, text) pairs. ``path-tuple`` is consumed by
|
||||
``set_request_text`` to write the redacted value back.
|
||||
"""
|
||||
parts: List[Tuple[Tuple[Any, ...], str]] = []
|
||||
|
||||
if endpoint == "/v1/responses":
|
||||
input_val = body.get("input")
|
||||
if isinstance(input_val, str) and input_val:
|
||||
parts.append((("input",), input_val))
|
||||
elif isinstance(input_val, list):
|
||||
for i, item in enumerate(input_val):
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
if item.get("role") != "user":
|
||||
continue
|
||||
content = item.get("content")
|
||||
if isinstance(content, str) and content:
|
||||
parts.append((("input", i, "content"), content))
|
||||
elif isinstance(content, list):
|
||||
for j, sub in enumerate(content):
|
||||
if isinstance(sub, dict) and sub.get("type") == "text":
|
||||
text = sub.get("text", "")
|
||||
if text:
|
||||
parts.append((("input", i, "content", j, "text"), text))
|
||||
return parts
|
||||
|
||||
# /v1/chat/completions and /v1/messages both use messages[]
|
||||
messages = body.get("messages") or []
|
||||
for i, msg in enumerate(messages):
|
||||
if not isinstance(msg, dict):
|
||||
continue
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
content = msg.get("content")
|
||||
if isinstance(content, str) and content:
|
||||
parts.append((("messages", i, "content"), content))
|
||||
elif isinstance(content, list):
|
||||
for j, sub in enumerate(content):
|
||||
if isinstance(sub, dict) and sub.get("type") == "text":
|
||||
text = sub.get("text", "")
|
||||
if text:
|
||||
parts.append((("messages", i, "content", j, "text"), text))
|
||||
return parts
|
||||
|
||||
|
||||
def set_request_text(body: Dict[str, Any], path: Tuple[Any, ...], value: str) -> None:
|
||||
"""Write ``value`` into ``body`` at the given path."""
|
||||
cur: Any = body
|
||||
for key in path[:-1]:
|
||||
cur = cur[key]
|
||||
cur[path[-1]] = value
|
||||
|
||||
|
||||
def iter_response_texts(
|
||||
body: Dict[str, Any], endpoint: str
|
||||
) -> List[Tuple[Tuple[Any, ...], str]]:
|
||||
"""Yield ``(path, text)`` for every text chunk in an LLM response body."""
|
||||
parts: List[Tuple[Tuple[Any, ...], str]] = []
|
||||
if endpoint == "/v1/messages":
|
||||
content = body.get("content")
|
||||
if isinstance(content, list):
|
||||
for j, sub in enumerate(content):
|
||||
if isinstance(sub, dict) and sub.get("type") == "text":
|
||||
text = sub.get("text", "")
|
||||
if text:
|
||||
parts.append((("content", j, "text"), text))
|
||||
return parts
|
||||
|
||||
# /v1/chat/completions, /v1/responses (synchronous), and similar
|
||||
choices = body.get("choices")
|
||||
if isinstance(choices, list):
|
||||
for i, choice in enumerate(choices):
|
||||
if not isinstance(choice, dict):
|
||||
continue
|
||||
message = choice.get("message")
|
||||
if isinstance(message, dict):
|
||||
content = message.get("content")
|
||||
if isinstance(content, str) and content:
|
||||
parts.append((("choices", i, "message", "content"), content))
|
||||
# OpenAI Responses API: top-level "output" array
|
||||
output = body.get("output")
|
||||
if isinstance(output, list):
|
||||
for i, item in enumerate(output):
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
content = item.get("content")
|
||||
if isinstance(content, list):
|
||||
for j, sub in enumerate(content):
|
||||
if isinstance(sub, dict) and sub.get("type") in (
|
||||
"text",
|
||||
"output_text",
|
||||
):
|
||||
text = sub.get("text", "")
|
||||
if text:
|
||||
parts.append((("output", i, "content", j, "text"), text))
|
||||
return parts
|
||||
|
||||
|
||||
# -------------------------------------------------------------------- FastAPI
|
||||
|
||||
|
||||
def create_app(client: Optional[PEyeEyeClient] = None) -> FastAPI:
|
||||
"""Build the FastAPI app. ``client`` is overridable for tests."""
|
||||
app = FastAPI(title="Peyeeye PII filter", version="1.0.0")
|
||||
cache = _SessionCache()
|
||||
|
||||
def _client() -> PEyeEyeClient:
|
||||
nonlocal client
|
||||
if client is None:
|
||||
client = PEyeEyeClient()
|
||||
return client
|
||||
|
||||
@app.post("/redact/{path:path}")
|
||||
async def redact(path: str, request: Request) -> Response:
|
||||
endpoint = f"/{path}"
|
||||
request_id = request.headers.get("x-request-id", "unknown")
|
||||
try:
|
||||
body = await request.json()
|
||||
except json.JSONDecodeError:
|
||||
raise HTTPException(status_code=400, detail="invalid JSON body")
|
||||
|
||||
text_parts = iter_request_texts(body, endpoint)
|
||||
if not text_parts:
|
||||
logger.info("request_id=%s no text to redact", request_id)
|
||||
return JSONResponse(content=body)
|
||||
|
||||
texts = [t for _, t in text_parts]
|
||||
try:
|
||||
redacted, session_id = await _client().redact_batch(texts)
|
||||
except PEyeEyeMissingSecrets as e:
|
||||
logger.error("request_id=%s missing secrets: %s", request_id, e)
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
except PEyeEyeAPIError as e:
|
||||
# Fail-closed: do not pass unredacted text through.
|
||||
logger.error("request_id=%s redact failed: %s", request_id, e)
|
||||
raise HTTPException(status_code=502, detail=str(e))
|
||||
|
||||
# Length-guard: must match exactly. The API contract is one-to-one.
|
||||
if len(redacted) != len(text_parts):
|
||||
logger.error(
|
||||
"request_id=%s length mismatch: sent=%d got=%d",
|
||||
request_id,
|
||||
len(text_parts),
|
||||
len(redacted),
|
||||
)
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail=(
|
||||
f"Peyeeye /v1/redact returned {len(redacted)} texts for "
|
||||
f"{len(text_parts)} inputs; refusing to forward partially-"
|
||||
"redacted data"
|
||||
),
|
||||
)
|
||||
|
||||
for (path_tuple, _), value in zip(text_parts, redacted):
|
||||
set_request_text(body, path_tuple, value)
|
||||
|
||||
if session_id:
|
||||
cache.set(request_id, session_id)
|
||||
logger.info(
|
||||
"request_id=%s redacted %d chunk(s); cached session",
|
||||
request_id,
|
||||
len(text_parts),
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"request_id=%s redacted %d chunk(s); no session id returned",
|
||||
request_id,
|
||||
len(text_parts),
|
||||
)
|
||||
|
||||
return JSONResponse(content=body)
|
||||
|
||||
@app.post("/rehydrate/{path:path}")
|
||||
async def rehydrate(path: str, request: Request) -> Response:
|
||||
endpoint = f"/{path}"
|
||||
request_id = request.headers.get("x-request-id", "unknown")
|
||||
raw = await request.body()
|
||||
|
||||
session_id = cache.pop(request_id)
|
||||
if not session_id:
|
||||
logger.info(
|
||||
"request_id=%s no session cached, passing response through",
|
||||
request_id,
|
||||
)
|
||||
return Response(content=raw, media_type="application/json")
|
||||
|
||||
# Streaming SSE: not supported for stateful rehydration in this demo.
|
||||
# Plano sends raw chunks, but rehydration needs the full token to look
|
||||
# up the original; we pass through and rely on a non-streaming flow.
|
||||
body_str = raw.decode("utf-8", errors="replace")
|
||||
if body_str.lstrip().startswith("data:") or "data: " in body_str[:32]:
|
||||
logger.warning(
|
||||
"request_id=%s SSE not supported; passing through "
|
||||
"(use non-streaming for now)",
|
||||
request_id,
|
||||
)
|
||||
# Don't lose the session id on the way back if SSE.
|
||||
cache.set(request_id, session_id)
|
||||
return Response(content=raw, media_type="text/event-stream")
|
||||
|
||||
try:
|
||||
body = json.loads(body_str)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning(
|
||||
"request_id=%s response is not JSON; passing through", request_id
|
||||
)
|
||||
return Response(content=raw, media_type="application/json")
|
||||
|
||||
text_parts = iter_response_texts(body, endpoint)
|
||||
if not text_parts:
|
||||
logger.info("request_id=%s no response text to rehydrate", request_id)
|
||||
await _maybe_delete_session(_client(), session_id)
|
||||
return JSONResponse(content=body)
|
||||
|
||||
# Run rehydrate calls concurrently — each returns the original text,
|
||||
# falling back to the redacted value on rehydrate error.
|
||||
tasks = [_client().rehydrate(text, session_id) for _, text in text_parts]
|
||||
restored = await asyncio.gather(*tasks)
|
||||
for (path_tuple, _), value in zip(text_parts, restored):
|
||||
set_request_text(body, path_tuple, value)
|
||||
|
||||
await _maybe_delete_session(_client(), session_id)
|
||||
logger.info("request_id=%s rehydrated %d chunk(s)", request_id, len(text_parts))
|
||||
return JSONResponse(content=body)
|
||||
|
||||
@app.get("/health")
|
||||
async def health() -> Dict[str, str]:
|
||||
return {"status": "healthy"}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
async def _maybe_delete_session(client: PEyeEyeClient, session_id: str) -> None:
|
||||
"""Best-effort delete of a stateful session id."""
|
||||
if client.session_mode == "stateful":
|
||||
try:
|
||||
await client.delete_session(session_id)
|
||||
except Exception: # pragma: no cover - best effort
|
||||
pass
|
||||
|
||||
|
||||
# Default ASGI app — used by ``uvicorn peyeeye:app``.
|
||||
# Lazily resolves the client so tests can construct ``create_app(client=...)``
|
||||
# without requiring PEYEEYE_API_KEY in the environment.
|
||||
app = create_app()
|
||||
29
demos/filter_chains/peyeeye/pyproject.toml
Normal file
29
demos/filter_chains/peyeeye/pyproject.toml
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
[project]
|
||||
name = "peyeeye_filter"
|
||||
version = "0.1.0"
|
||||
description = "Peyeeye PII redaction & rehydration filter for Plano filter chains"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10,<3.14"
|
||||
dependencies = [
|
||||
"fastapi>=0.104.1",
|
||||
"uvicorn>=0.24.0",
|
||||
"httpx>=0.27.0",
|
||||
"pydantic>=2.0.0",
|
||||
]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pytest>=8.0.0",
|
||||
"pytest-asyncio>=0.23.0",
|
||||
"respx>=0.21.0",
|
||||
]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["."]
|
||||
44
demos/filter_chains/peyeeye/run_demo.sh
Normal file
44
demos/filter_chains/peyeeye/run_demo.sh
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
start_demo() {
|
||||
if [ -f ".env" ]; then
|
||||
echo ".env file already exists. Skipping creation."
|
||||
else
|
||||
if [ -z "${PEYEEYE_API_KEY:-}" ]; then
|
||||
echo "Error: PEYEEYE_API_KEY environment variable is not set for the demo."
|
||||
exit 1
|
||||
fi
|
||||
if [ -z "${OPENAI_API_KEY:-}" ]; then
|
||||
echo "Error: OPENAI_API_KEY environment variable is not set for the demo."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Creating .env file..."
|
||||
{
|
||||
echo "PEYEEYE_API_KEY=$PEYEEYE_API_KEY"
|
||||
echo "OPENAI_API_KEY=$OPENAI_API_KEY"
|
||||
} > .env
|
||||
echo ".env file created."
|
||||
fi
|
||||
|
||||
echo "Starting Plano with config.yaml..."
|
||||
planoai up config.yaml
|
||||
|
||||
echo "Starting Peyeeye filter service..."
|
||||
bash start_agents.sh &
|
||||
}
|
||||
|
||||
stop_demo() {
|
||||
echo "Stopping Peyeeye filter service..."
|
||||
pkill -f start_agents.sh 2>/dev/null || true
|
||||
|
||||
echo "Stopping Plano..."
|
||||
planoai down
|
||||
}
|
||||
|
||||
if [ "$1" == "down" ]; then
|
||||
stop_demo
|
||||
else
|
||||
start_demo
|
||||
fi
|
||||
29
demos/filter_chains/peyeeye/start_agents.sh
Normal file
29
demos/filter_chains/peyeeye/start_agents.sh
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
PIDS=()
|
||||
|
||||
log() { echo "$(date '+%F %T') - $*"; }
|
||||
|
||||
cleanup() {
|
||||
log "Stopping agents..."
|
||||
for PID in "${PIDS[@]}"; do
|
||||
kill $PID 2>/dev/null && log "Stopped process $PID"
|
||||
done
|
||||
exit 0
|
||||
}
|
||||
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
if [ -z "${PEYEEYE_API_KEY:-}" ]; then
|
||||
log "ERROR: PEYEEYE_API_KEY is not set."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log "Starting Peyeeye filter service on port 10502..."
|
||||
uv run uvicorn peyeeye:app --host 0.0.0.0 --port 10502 &
|
||||
PIDS+=($!)
|
||||
|
||||
for PID in "${PIDS[@]}"; do
|
||||
wait "$PID"
|
||||
done
|
||||
71
demos/filter_chains/peyeeye/test.sh
Normal file
71
demos/filter_chains/peyeeye/test.sh
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
BASE_URL="http://localhost:12000"
|
||||
PASS=0
|
||||
FAIL=0
|
||||
|
||||
echo "Waiting for Plano to be ready..."
|
||||
for i in $(seq 1 30); do
|
||||
if curl -sf "$BASE_URL/v1/models" > /dev/null 2>&1; then
|
||||
echo "Plano is ready."
|
||||
break
|
||||
fi
|
||||
if [ "$i" -eq 30 ]; then
|
||||
echo "ERROR: Plano did not become ready in time."
|
||||
exit 1
|
||||
fi
|
||||
sleep 2
|
||||
done
|
||||
|
||||
run_test() {
|
||||
local name="$1"
|
||||
local path="$2"
|
||||
local expected_code="$3"
|
||||
local body="$4"
|
||||
|
||||
http_code=$(curl -s -o /tmp/peyeeye_test_body -w "%{http_code}" \
|
||||
-X POST "$BASE_URL$path" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$body")
|
||||
|
||||
if [ "$http_code" -eq "$expected_code" ]; then
|
||||
echo " PASS $name (HTTP $http_code)"
|
||||
PASS=$((PASS + 1))
|
||||
else
|
||||
echo " FAIL $name -- expected $expected_code, got $http_code"
|
||||
echo " Body: $(cat /tmp/peyeeye_test_body)"
|
||||
FAIL=$((FAIL + 1))
|
||||
fi
|
||||
}
|
||||
|
||||
echo ""
|
||||
echo "=== /v1/chat/completions ==="
|
||||
|
||||
run_test "Non-streaming with PII" /v1/chat/completions 200 '{
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "Email me at jane@example.com"}],
|
||||
"stream": false
|
||||
}'
|
||||
|
||||
run_test "No PII" /v1/chat/completions 200 '{
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "What is 2+2?"}],
|
||||
"stream": false
|
||||
}'
|
||||
|
||||
echo ""
|
||||
echo "=== /v1/messages (Anthropic) ==="
|
||||
|
||||
run_test "Non-streaming with PII (SSN)" /v1/messages 200 '{
|
||||
"model": "claude-sonnet-4-20250514",
|
||||
"max_tokens": 256,
|
||||
"messages": [{"role": "user", "content": "My SSN is 123-45-6789"}]
|
||||
}'
|
||||
|
||||
echo ""
|
||||
echo "Results: $PASS passed, $FAIL failed"
|
||||
|
||||
if [ "$FAIL" -gt 0 ]; then
|
||||
exit 1
|
||||
fi
|
||||
471
demos/filter_chains/peyeeye/test_peyeeye.py
Normal file
471
demos/filter_chains/peyeeye/test_peyeeye.py
Normal file
|
|
@ -0,0 +1,471 @@
|
|||
"""Tests for the Peyeeye Plano filter.
|
||||
|
||||
The Peyeeye API (`https://api.peyeeye.ai`) is fully mocked with `respx`. Every
|
||||
new branch in `peyeeye.py` should have at least one test below.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, Dict
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
import respx
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from peyeeye import (
|
||||
PEyeEyeClient,
|
||||
PEyeEyeMissingSecrets,
|
||||
create_app,
|
||||
iter_request_texts,
|
||||
iter_response_texts,
|
||||
set_request_text,
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------- fixtures
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_base() -> str:
|
||||
return "https://api.peyeeye.ai"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(monkeypatch, api_base) -> PEyeEyeClient:
|
||||
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
|
||||
monkeypatch.delenv("PEYEEYE_API_BASE", raising=False)
|
||||
monkeypatch.delenv("PEYEEYE_LOCALE", raising=False)
|
||||
monkeypatch.delenv("PEYEEYE_ENTITIES", raising=False)
|
||||
monkeypatch.delenv("PEYEEYE_SESSION_MODE", raising=False)
|
||||
return PEyeEyeClient()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_client(client) -> TestClient:
|
||||
return TestClient(create_app(client=client))
|
||||
|
||||
|
||||
# --------------------------------------------------------------- client tests
|
||||
|
||||
|
||||
def test_client_missing_api_key(monkeypatch):
|
||||
monkeypatch.delenv("PEYEEYE_API_KEY", raising=False)
|
||||
with pytest.raises(PEyeEyeMissingSecrets):
|
||||
PEyeEyeClient()
|
||||
|
||||
|
||||
def test_client_invalid_session_mode(monkeypatch):
|
||||
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
|
||||
with pytest.raises(ValueError):
|
||||
PEyeEyeClient(session_mode="bogus")
|
||||
|
||||
|
||||
def test_client_picks_up_env_entities(monkeypatch):
|
||||
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
|
||||
monkeypatch.setenv("PEYEEYE_ENTITIES", "EMAIL, SSN ,CREDIT_CARD")
|
||||
c = PEyeEyeClient()
|
||||
assert c.entities == ["EMAIL", "SSN", "CREDIT_CARD"]
|
||||
|
||||
|
||||
# -------------------------------------------------------------- iter helpers
|
||||
|
||||
|
||||
def test_iter_request_texts_chat_string():
|
||||
body = {"messages": [{"role": "user", "content": "hello jane@example.com"}]}
|
||||
parts = iter_request_texts(body, "/v1/chat/completions")
|
||||
assert parts == [(("messages", 0, "content"), "hello jane@example.com")]
|
||||
|
||||
|
||||
def test_iter_request_texts_chat_multimodal():
|
||||
body = {
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "hi"},
|
||||
{"type": "image_url", "image_url": {"url": "..."}},
|
||||
{"type": "text", "text": "ssn 111-22-3333"},
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
parts = iter_request_texts(body, "/v1/chat/completions")
|
||||
assert parts == [
|
||||
(("messages", 0, "content", 0, "text"), "hi"),
|
||||
(("messages", 0, "content", 2, "text"), "ssn 111-22-3333"),
|
||||
]
|
||||
|
||||
|
||||
def test_iter_request_texts_skips_non_user_roles():
|
||||
body = {
|
||||
"messages": [
|
||||
{"role": "system", "content": "you are helpful"},
|
||||
{"role": "user", "content": "hi"},
|
||||
{"role": "assistant", "content": "hello"},
|
||||
]
|
||||
}
|
||||
parts = iter_request_texts(body, "/v1/chat/completions")
|
||||
assert len(parts) == 1
|
||||
assert parts[0][1] == "hi"
|
||||
|
||||
|
||||
def test_iter_request_texts_responses_string():
|
||||
body = {"input": "hello jane@example.com"}
|
||||
parts = iter_request_texts(body, "/v1/responses")
|
||||
assert parts == [(("input",), "hello jane@example.com")]
|
||||
|
||||
|
||||
def test_iter_request_texts_responses_list():
|
||||
body = {
|
||||
"input": [
|
||||
{"role": "system", "content": "ignored"},
|
||||
{"role": "user", "content": "hi"},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": "ssn 111-22-3333"}],
|
||||
},
|
||||
]
|
||||
}
|
||||
parts = iter_request_texts(body, "/v1/responses")
|
||||
assert parts == [
|
||||
(("input", 1, "content"), "hi"),
|
||||
(("input", 2, "content", 0, "text"), "ssn 111-22-3333"),
|
||||
]
|
||||
|
||||
|
||||
def test_set_request_text_round_trip():
|
||||
body: Dict[str, Any] = {"messages": [{"role": "user", "content": "raw"}]}
|
||||
set_request_text(body, ("messages", 0, "content"), "redacted")
|
||||
assert body["messages"][0]["content"] == "redacted"
|
||||
|
||||
|
||||
def test_iter_response_texts_chat():
|
||||
body = {"choices": [{"message": {"content": "Email [EMAIL_0] back."}}]}
|
||||
parts = iter_response_texts(body, "/v1/chat/completions")
|
||||
assert parts == [(("choices", 0, "message", "content"), "Email [EMAIL_0] back.")]
|
||||
|
||||
|
||||
def test_iter_response_texts_anthropic():
|
||||
body = {"content": [{"type": "text", "text": "Reach [EMAIL_0]"}]}
|
||||
parts = iter_response_texts(body, "/v1/messages")
|
||||
assert parts == [(("content", 0, "text"), "Reach [EMAIL_0]")]
|
||||
|
||||
|
||||
# ------------------------------------------------------------ /redact endpoint
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_chat_happy_path(app_client, api_base):
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={
|
||||
"text": ["hello [EMAIL_0]"],
|
||||
"session_id": "ses_abc",
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "hello jane@example.com"}],
|
||||
},
|
||||
headers={"x-request-id": "req-1"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["messages"][0]["content"] == "hello [EMAIL_0]"
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_no_pii_no_session_cached(app_client, api_base):
|
||||
# Even with no text-bearing chunks, the body is returned untouched.
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={"model": "gpt-4o-mini", "messages": []},
|
||||
headers={"x-request-id": "req-empty"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
# No call should have been made to peyeeye.
|
||||
assert not respx.routes
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_length_guard_fails_closed(app_client, api_base):
|
||||
"""If the API returns a different number of texts, refuse to forward."""
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={"text": ["only one"], "session_id": "ses_x"},
|
||||
)
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [
|
||||
{"role": "user", "content": "first jane@example.com"},
|
||||
{"role": "user", "content": "second 555-123-4567"},
|
||||
],
|
||||
},
|
||||
headers={"x-request-id": "req-bad-len"},
|
||||
)
|
||||
assert resp.status_code == 502
|
||||
assert "refusing to forward" in resp.json()["detail"]
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_unexpected_shape_fails_closed(app_client, api_base):
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(200, json={"text": 42, "session_id": "ses_x"})
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "x"}],
|
||||
},
|
||||
headers={"x-request-id": "req-bad-shape"},
|
||||
)
|
||||
assert resp.status_code == 502
|
||||
assert "unexpected response shape" in resp.json()["detail"]
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_5xx_fails_closed(app_client, api_base):
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(500, text="boom")
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "x"}],
|
||||
},
|
||||
headers={"x-request-id": "req-5xx"},
|
||||
)
|
||||
assert resp.status_code == 502
|
||||
assert "HTTP 500" in resp.json()["detail"]
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_401_fails_closed_with_500(app_client, api_base):
|
||||
"""Auth errors surface as PEyeEyeMissingSecrets -> HTTP 500."""
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(401, json={"error": "bad key"})
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "x"}],
|
||||
},
|
||||
headers={"x-request-id": "req-401"},
|
||||
)
|
||||
assert resp.status_code == 500
|
||||
assert "Invalid Peyeeye API key" in resp.json()["detail"]
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_invalid_json_body(app_client):
|
||||
resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
content=b"not json",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"x-request-id": "req-bad-json",
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
# --------------------------------------------------------- /rehydrate endpoint
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_rehydrate_round_trip(app_client, api_base):
|
||||
"""Redact then rehydrate share the request id; placeholders restored."""
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={
|
||||
"text": ["hello [EMAIL_0]"],
|
||||
"session_id": "ses_round",
|
||||
},
|
||||
)
|
||||
)
|
||||
respx.post(f"{api_base}/v1/rehydrate").mock(
|
||||
return_value=httpx.Response(
|
||||
200, json={"text": "Reach jane@example.com today.", "replaced": 1}
|
||||
)
|
||||
)
|
||||
respx.delete(f"{api_base}/v1/sessions/ses_round").mock(
|
||||
return_value=httpx.Response(204)
|
||||
)
|
||||
|
||||
redact_resp = app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "hello jane@example.com"}],
|
||||
},
|
||||
headers={"x-request-id": "req-round"},
|
||||
)
|
||||
assert redact_resp.status_code == 200
|
||||
|
||||
rehydrate_resp = app_client.post(
|
||||
"/rehydrate/v1/chat/completions",
|
||||
json={"choices": [{"message": {"content": "Reach [EMAIL_0] today."}}]},
|
||||
headers={"x-request-id": "req-round"},
|
||||
)
|
||||
assert rehydrate_resp.status_code == 200
|
||||
body = rehydrate_resp.json()
|
||||
assert body["choices"][0]["message"]["content"] == "Reach jane@example.com today."
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_rehydrate_no_session_cached_passthrough(app_client):
|
||||
raw = {"choices": [{"message": {"content": "no session here"}}]}
|
||||
resp = app_client.post(
|
||||
"/rehydrate/v1/chat/completions",
|
||||
json=raw,
|
||||
headers={"x-request-id": "req-uncached"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json() == raw
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_rehydrate_anthropic_messages(app_client, api_base):
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={"text": ["My ssn is [SSN_0]"], "session_id": "ses_anth"},
|
||||
)
|
||||
)
|
||||
respx.post(f"{api_base}/v1/rehydrate").mock(
|
||||
return_value=httpx.Response(
|
||||
200, json={"text": "My ssn is 111-22-3333", "replaced": 1}
|
||||
)
|
||||
)
|
||||
respx.delete(f"{api_base}/v1/sessions/ses_anth").mock(
|
||||
return_value=httpx.Response(204)
|
||||
)
|
||||
|
||||
app_client.post(
|
||||
"/redact/v1/messages",
|
||||
json={
|
||||
"model": "claude-sonnet-4",
|
||||
"messages": [{"role": "user", "content": "My ssn is 111-22-3333"}],
|
||||
},
|
||||
headers={"x-request-id": "req-anth"},
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/rehydrate/v1/messages",
|
||||
json={"content": [{"type": "text", "text": "My ssn is [SSN_0]"}]},
|
||||
headers={"x-request-id": "req-anth"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["content"][0]["text"] == "My ssn is 111-22-3333"
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_rehydrate_sse_passthrough(app_client, api_base):
|
||||
"""SSE bodies are passed through unchanged (and the session id is kept)."""
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200, json={"text": ["[EMAIL_0]"], "session_id": "ses_sse"}
|
||||
)
|
||||
)
|
||||
app_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "jane@example.com"}],
|
||||
},
|
||||
headers={"x-request-id": "req-sse"},
|
||||
)
|
||||
sse_body = b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n'
|
||||
resp = app_client.post(
|
||||
"/rehydrate/v1/chat/completions",
|
||||
content=sse_body,
|
||||
headers={
|
||||
"Content-Type": "text/event-stream",
|
||||
"x-request-id": "req-sse",
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.content == sse_body
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_redact_responses_input_string(app_client, api_base):
|
||||
respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={
|
||||
"text": ["My email is [EMAIL_0]"],
|
||||
"session_id": "ses_resp",
|
||||
},
|
||||
)
|
||||
)
|
||||
resp = app_client.post(
|
||||
"/redact/v1/responses",
|
||||
json={"model": "gpt-4o-mini", "input": "My email is jane@example.com"},
|
||||
headers={"x-request-id": "req-resp"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["input"] == "My email is [EMAIL_0]"
|
||||
|
||||
|
||||
@respx.mock
|
||||
def test_stateless_session_mode(monkeypatch, api_base):
|
||||
"""Stateless mode caches the rehydration_key and skips DELETE."""
|
||||
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
|
||||
monkeypatch.setenv("PEYEEYE_SESSION_MODE", "stateless")
|
||||
client = PEyeEyeClient()
|
||||
test_client = TestClient(create_app(client=client))
|
||||
|
||||
redact_route = respx.post(f"{api_base}/v1/redact").mock(
|
||||
return_value=httpx.Response(
|
||||
200,
|
||||
json={
|
||||
"text": ["hello [EMAIL_0]"],
|
||||
"rehydration_key": "skey_xyz",
|
||||
},
|
||||
)
|
||||
)
|
||||
rehydrate_route = respx.post(f"{api_base}/v1/rehydrate").mock(
|
||||
return_value=httpx.Response(
|
||||
200, json={"text": "hello jane@example.com", "replaced": 1}
|
||||
)
|
||||
)
|
||||
delete_route = respx.delete(f"{api_base}/v1/sessions/skey_xyz")
|
||||
|
||||
test_client.post(
|
||||
"/redact/v1/chat/completions",
|
||||
json={
|
||||
"model": "gpt-4o-mini",
|
||||
"messages": [{"role": "user", "content": "hello jane@example.com"}],
|
||||
},
|
||||
headers={"x-request-id": "req-stateless"},
|
||||
)
|
||||
# Inspect the redact request body to confirm session=stateless was sent.
|
||||
assert redact_route.called
|
||||
sent_body = json.loads(redact_route.calls.last.request.content)
|
||||
assert sent_body["session"] == "stateless"
|
||||
|
||||
resp = test_client.post(
|
||||
"/rehydrate/v1/chat/completions",
|
||||
json={"choices": [{"message": {"content": "hello [EMAIL_0]"}}]},
|
||||
headers={"x-request-id": "req-stateless"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["choices"][0]["message"]["content"] == "hello jane@example.com"
|
||||
# Stateless: no DELETE call.
|
||||
assert not delete_route.called
|
||||
assert rehydrate_route.called
|
||||
Loading…
Add table
Add a link
Reference in a new issue