This commit is contained in:
tim-peyeeye 2026-05-29 07:41:24 +08:00 committed by GitHub
commit efec97d6a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 1296 additions and 0 deletions

View file

@ -0,0 +1,22 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
RUN pip install --no-cache-dir uv
COPY pyproject.toml ./
RUN uv pip install --system --no-cache-dir \
"fastapi>=0.104.1" \
"uvicorn>=0.24.0" \
"httpx>=0.27.0" \
"pydantic>=2.0.0"
COPY peyeeye.py ./
EXPOSE 10502
CMD ["uvicorn", "peyeeye:app", "--host", "0.0.0.0", "--port", "10502"]

View file

@ -0,0 +1,103 @@
# Peyeeye PII Filter Chain Demo
Drop-in PII redaction + rehydration for Plano via the [Peyeeye](https://peyeeye.ai) API.
The model never sees raw PII — incoming text is sent to `/v1/redact` and replaced with stable placeholders like `[EMAIL_0]`, `[PERSON_1]`, etc. After the model responds, the placeholders in its output are swapped back to the originals via `/v1/rehydrate`.
## Architecture
```
Client --> Plano (model listener :12000)
|
+-- input_filters: peyeeye_redact
| -> POST https://api.peyeeye.ai/v1/redact
| -> body messages contain [EMAIL_0], [SSN_0], ...
|
+-- model_provider: openai/gpt-4o-mini (or claude, etc.)
| -> the LLM only ever sees redacted text
|
+-- output_filters: peyeeye_rehydrate
-> POST https://api.peyeeye.ai/v1/rehydrate
-> placeholders restored to originals
```
## Quick start
```bash
export PEYEEYE_API_KEY=pk_live_... # https://peyeeye.ai
export OPENAI_API_KEY=sk-...
bash run_demo.sh
# in another terminal
bash test.sh
# stop
bash run_demo.sh down
```
## Try it
```bash
curl http://localhost:12000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Email me at jane@example.com about SSN 123-45-6789"}],
"stream": false
}'
```
The response body comes back with the original email and SSN restored, while the request that hit OpenAI carried `[EMAIL_0]` and `[SSN_0]`.
## Configuration
All configuration is via env vars on the filter service:
| Var | Default | Notes |
|---|---|---|
| `PEYEEYE_API_KEY` | _required_ | get one at peyeeye.ai |
| `PEYEEYE_API_BASE` | `https://api.peyeeye.ai` | override for self-hosted |
| `PEYEEYE_LOCALE` | `auto` | BCP-47 |
| `PEYEEYE_ENTITIES` | _all_ | comma-separated list, e.g. `EMAIL,SSN,CREDIT_CARD` |
| `PEYEEYE_SESSION_MODE` | `stateful` | `stateful` (default) or `stateless` |
In `stateless` mode, Peyeeye returns a sealed `skey_...` blob instead of holding the mapping server-side; this filter caches the blob on the request id and uses it for rehydration. No per-request state is retained on Peyeeye's servers.
## Filter contract
**Input filter (`/redact/{path:path}`)** receives the full raw request body. It walks `messages[].content` (string or multimodal `text` parts) for chat-style endpoints and `input` for the OpenAI Responses API, sends a single batched call to Peyeeye, and writes the redacted text back into the body in place.
**Output filter (`/rehydrate/{path:path}`)** receives the raw LLM response bytes, looks up the cached session id by the request id (`x-request-id`), and rehydrates `choices[].message.content`, Anthropic-style `content[].text`, or Responses-API `output[].content[].text`.
## Behavioral invariants
- **Fail-closed.** If `/v1/redact` returns an unexpected shape, or the count of returned texts doesn't match the count sent, the filter raises a 502 — no unredacted text is ever forwarded to the model.
- **Length-guard.** `len(redacted) == len(sent)` is asserted before zipping the values back into the request.
- **Typed errors.** `PEyeEyeMissingSecrets` covers auth (401, missing key), `PEyeEyeAPIError` covers everything else (timeouts, 4xx, 5xx, parse). Both surface as HTTP errors to Plano.
- **Best-effort cleanup.** Stateful sessions are deleted server-side after rehydration via `DELETE /v1/sessions/{ses_...}`.
## Streaming
Streaming SSE responses are passed through unchanged in this demo — token-by-token rehydration would require buffering or a session-aware streaming endpoint. For now, set `stream: false` on requests routed through this filter chain.
## Tests
```bash
uv sync --group dev
uv run pytest -v
```
The suite mocks the Peyeeye API (`respx`) and exercises:
- redact + rehydrate round trip on chat completions
- redact + rehydrate on `/v1/messages` (Anthropic) and `/v1/responses` (OpenAI)
- the length-guard (redact returns wrong count -> 502)
- the unexpected-shape guard (redact returns non-string/list -> 502)
- the no-PII passthrough (no redactable text -> body unchanged, no session cached)
- the no-cached-session passthrough on rehydrate
- multimodal `[{"type":"text","text":...}]` content lists
## Disclosure
I'm the maintainer of peyeeye.ai. Happy to adjust API surface, naming, or test coverage to match Plano's conventions.

View file

@ -0,0 +1,42 @@
version: v0.3.0
# Peyeeye PII redaction & rehydration filter chain.
#
# Prereqs:
# * export PEYEEYE_API_KEY=... (required)
# * export OPENAI_API_KEY=... (or any provider you wire up below)
#
# The peyeeye filter service runs on :10502 and exposes:
# POST /redact/{path} — input filter
# POST /rehydrate/{path} — output filter
#
# Plano appends the upstream path automatically (e.g. /v1/chat/completions),
# so /redact/v1/chat/completions hits the redactor, and the model only ever
# sees ``[EMAIL_0]``-style placeholders.
filters:
- id: peyeeye_redact
url: http://localhost:10502/redact
type: http
- id: peyeeye_rehydrate
url: http://localhost:10502/rehydrate
type: http
model_providers:
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
default: true
- model: anthropic/claude-sonnet-4-20250514
access_key: $ANTHROPIC_API_KEY
listeners:
- type: model
name: llm_gateway
port: 12000
input_filters:
- peyeeye_redact
output_filters:
- peyeeye_rehydrate
tracing:
random_sampling: 100

View file

@ -0,0 +1,485 @@
"""Peyeeye PII redaction & rehydration filter for Plano filter chains.
Two endpoints, mirroring the pii_anonymizer demo:
POST /redact/{path:path} input filter; redacts PII before the LLM call.
POST /rehydrate/{path:path} output filter; restores PII in the LLM response.
The filter delegates detection and rehydration to the Peyeeye API
(``https://api.peyeeye.ai`` by default). Two session modes are supported:
* ``stateful`` (default) Peyeeye holds the token -> value mapping under a
``ses_...`` id; rehydrate references the id.
* ``stateless`` Peyeeye returns a sealed ``skey_...`` blob; nothing is
retained server-side.
Behavioral invariants (mirrored from the LiteLLM peyeeye guardrail):
* Pre-call: redact every text-bearing chunk in the request. If the count of
returned texts doesn't match the count sent, raise -- never forward
partially-redacted source.
* Post-call: pull the cached session id, rehydrate the response, and best-
effort delete the stateful session.
* Fail-closed: any unexpected response shape from /v1/redact raises a typed
error rather than silently passing PII through.
Configuration knobs (env vars):
* ``PEYEEYE_API_KEY`` required.
* ``PEYEEYE_API_BASE`` defaults to ``https://api.peyeeye.ai``.
* ``PEYEEYE_LOCALE`` BCP-47, default ``auto``.
* ``PEYEEYE_ENTITIES`` comma-separated entity ids to restrict detection.
* ``PEYEEYE_SESSION_MODE`` ``stateful`` (default) or ``stateless``.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
import time
from typing import Any, Dict, List, Optional, Tuple
import httpx
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, Response
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - [PEYEEYE] - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
DEFAULT_API_BASE = "https://api.peyeeye.ai"
SESSION_TTL_SECONDS = 3600
HTTP_TIMEOUT_SECONDS = 15.0
# ----------------------------------------------------------------------- errors
class PEyeEyeAPIError(Exception):
"""Raised when the Peyeeye API returns an error or unexpected payload."""
class PEyeEyeMissingSecrets(Exception):
"""Raised when no Peyeeye API key is configured."""
# ------------------------------------------------------------------- mini cache
class _SessionCache:
"""Tiny in-memory ``request_id -> session_id`` store with TTL."""
def __init__(self, ttl_seconds: int = SESSION_TTL_SECONDS) -> None:
self._ttl = ttl_seconds
self._lock = threading.Lock()
self._store: Dict[str, Tuple[str, float]] = {}
def _expire(self) -> None:
now = time.time()
expired = [k for k, (_, ts) in self._store.items() if now - ts > self._ttl]
for k in expired:
del self._store[k]
def set(self, key: str, value: str) -> None:
with self._lock:
self._expire()
self._store[key] = (value, time.time())
def get(self, key: str) -> Optional[str]:
with self._lock:
entry = self._store.get(key)
return entry[0] if entry else None
def pop(self, key: str) -> Optional[str]:
with self._lock:
entry = self._store.pop(key, None)
return entry[0] if entry else None
# ------------------------------------------------------------------ peyeeye client
class PEyeEyeClient:
"""Async HTTP client for the Peyeeye redact/rehydrate API."""
def __init__(
self,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
locale: Optional[str] = None,
entities: Optional[List[str]] = None,
session_mode: Optional[str] = None,
) -> None:
key = api_key or os.environ.get("PEYEEYE_API_KEY")
if not key:
raise PEyeEyeMissingSecrets(
"Peyeeye API key missing — set the PEYEEYE_API_KEY env var."
)
self.api_key = key
self.api_base = (
api_base or os.environ.get("PEYEEYE_API_BASE") or DEFAULT_API_BASE
).rstrip("/")
self.locale = locale or os.environ.get("PEYEEYE_LOCALE") or "auto"
env_entities = os.environ.get("PEYEEYE_ENTITIES")
if entities is None and env_entities:
entities = [e.strip() for e in env_entities.split(",") if e.strip()]
self.entities = entities or None
mode = session_mode or os.environ.get("PEYEEYE_SESSION_MODE") or "stateful"
if mode not in ("stateful", "stateless"):
raise ValueError(
f"PEYEEYE_SESSION_MODE must be 'stateful' or 'stateless', got {mode!r}"
)
self.session_mode = mode
self._client: Optional[httpx.AsyncClient] = None
async def _async_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=HTTP_TIMEOUT_SECONDS)
return self._client
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
async def redact_batch(self, texts: List[str]) -> Tuple[List[str], Optional[str]]:
"""Redact a batch of texts. Returns ``(redacted, session_id_or_skey)``.
Raises ``PEyeEyeAPIError`` on any non-2xx, timeout, or unexpected shape.
"""
body: Dict[str, Any] = {"text": texts, "locale": self.locale}
if self.entities:
body["entities"] = list(self.entities)
if self.session_mode == "stateless":
body["session"] = "stateless"
payload = await self._post("/v1/redact", body)
out = payload.get("text")
if isinstance(out, str):
redacted = [out]
elif isinstance(out, list):
redacted = [str(x) for x in out]
else:
raise PEyeEyeAPIError(
"Peyeeye /v1/redact returned an unexpected response shape; "
"refusing to forward unredacted text."
)
if self.session_mode == "stateless":
session_id = payload.get("rehydration_key")
else:
session_id = payload.get("session_id") or payload.get("session")
return redacted, session_id
async def rehydrate(self, text: str, session_id: str) -> str:
if not text:
return text
try:
payload = await self._post(
"/v1/rehydrate", {"text": text, "session": session_id}
)
except PEyeEyeAPIError as e:
# Rehydration failures must not corrupt or drop the LLM response;
# log and fall back to the (already-redacted) text.
logger.warning("rehydrate failed: %s", e)
return text
return payload.get("text", text)
async def delete_session(self, session_id: str) -> None:
if not session_id.startswith("ses_"):
return
client = await self._async_client()
try:
await client.delete(
f"{self.api_base}/v1/sessions/{session_id}",
headers=self._headers(),
timeout=10.0,
)
except Exception as e: # pragma: no cover - best effort
logger.debug("session cleanup failed: %s", e)
async def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
client = await self._async_client()
url = f"{self.api_base}{path}"
try:
resp = await client.post(url, headers=self._headers(), json=body)
except httpx.TimeoutException as e:
raise PEyeEyeAPIError(f"Peyeeye {path} timed out") from e
except httpx.HTTPError as e:
raise PEyeEyeAPIError(f"Peyeeye {path} request failed: {e}") from e
if resp.status_code == 401:
raise PEyeEyeMissingSecrets("Invalid Peyeeye API key") from None
if resp.status_code >= 400:
raise PEyeEyeAPIError(
f"Peyeeye {path} returned HTTP {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json()
except json.JSONDecodeError as e:
raise PEyeEyeAPIError(f"Peyeeye {path} returned non-JSON body") from e
# ------------------------------------------------------------- request walkers
def iter_request_texts(body: Dict[str, Any], endpoint: str) -> List[Tuple[str, ...]]:
"""Yield ``("path", ...)`` tuples identifying every user-text chunk.
Mirrors the litellm pre-call hook: walks ``messages[].content`` (string or
multimodal text-part list) for chat-style endpoints, and ``input`` for the
OpenAI Responses API.
Returns a list of (path-tuple, text) pairs. ``path-tuple`` is consumed by
``set_request_text`` to write the redacted value back.
"""
parts: List[Tuple[Tuple[Any, ...], str]] = []
if endpoint == "/v1/responses":
input_val = body.get("input")
if isinstance(input_val, str) and input_val:
parts.append((("input",), input_val))
elif isinstance(input_val, list):
for i, item in enumerate(input_val):
if not isinstance(item, dict):
continue
if item.get("role") != "user":
continue
content = item.get("content")
if isinstance(content, str) and content:
parts.append((("input", i, "content"), content))
elif isinstance(content, list):
for j, sub in enumerate(content):
if isinstance(sub, dict) and sub.get("type") == "text":
text = sub.get("text", "")
if text:
parts.append((("input", i, "content", j, "text"), text))
return parts
# /v1/chat/completions and /v1/messages both use messages[]
messages = body.get("messages") or []
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
continue
if msg.get("role") != "user":
continue
content = msg.get("content")
if isinstance(content, str) and content:
parts.append((("messages", i, "content"), content))
elif isinstance(content, list):
for j, sub in enumerate(content):
if isinstance(sub, dict) and sub.get("type") == "text":
text = sub.get("text", "")
if text:
parts.append((("messages", i, "content", j, "text"), text))
return parts
def set_request_text(body: Dict[str, Any], path: Tuple[Any, ...], value: str) -> None:
"""Write ``value`` into ``body`` at the given path."""
cur: Any = body
for key in path[:-1]:
cur = cur[key]
cur[path[-1]] = value
def iter_response_texts(
body: Dict[str, Any], endpoint: str
) -> List[Tuple[Tuple[Any, ...], str]]:
"""Yield ``(path, text)`` for every text chunk in an LLM response body."""
parts: List[Tuple[Tuple[Any, ...], str]] = []
if endpoint == "/v1/messages":
content = body.get("content")
if isinstance(content, list):
for j, sub in enumerate(content):
if isinstance(sub, dict) and sub.get("type") == "text":
text = sub.get("text", "")
if text:
parts.append((("content", j, "text"), text))
return parts
# /v1/chat/completions, /v1/responses (synchronous), and similar
choices = body.get("choices")
if isinstance(choices, list):
for i, choice in enumerate(choices):
if not isinstance(choice, dict):
continue
message = choice.get("message")
if isinstance(message, dict):
content = message.get("content")
if isinstance(content, str) and content:
parts.append((("choices", i, "message", "content"), content))
# OpenAI Responses API: top-level "output" array
output = body.get("output")
if isinstance(output, list):
for i, item in enumerate(output):
if not isinstance(item, dict):
continue
content = item.get("content")
if isinstance(content, list):
for j, sub in enumerate(content):
if isinstance(sub, dict) and sub.get("type") in (
"text",
"output_text",
):
text = sub.get("text", "")
if text:
parts.append((("output", i, "content", j, "text"), text))
return parts
# -------------------------------------------------------------------- FastAPI
def create_app(client: Optional[PEyeEyeClient] = None) -> FastAPI:
"""Build the FastAPI app. ``client`` is overridable for tests."""
app = FastAPI(title="Peyeeye PII filter", version="1.0.0")
cache = _SessionCache()
def _client() -> PEyeEyeClient:
nonlocal client
if client is None:
client = PEyeEyeClient()
return client
@app.post("/redact/{path:path}")
async def redact(path: str, request: Request) -> Response:
endpoint = f"/{path}"
request_id = request.headers.get("x-request-id", "unknown")
try:
body = await request.json()
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="invalid JSON body")
text_parts = iter_request_texts(body, endpoint)
if not text_parts:
logger.info("request_id=%s no text to redact", request_id)
return JSONResponse(content=body)
texts = [t for _, t in text_parts]
try:
redacted, session_id = await _client().redact_batch(texts)
except PEyeEyeMissingSecrets as e:
logger.error("request_id=%s missing secrets: %s", request_id, e)
raise HTTPException(status_code=500, detail=str(e))
except PEyeEyeAPIError as e:
# Fail-closed: do not pass unredacted text through.
logger.error("request_id=%s redact failed: %s", request_id, e)
raise HTTPException(status_code=502, detail=str(e))
# Length-guard: must match exactly. The API contract is one-to-one.
if len(redacted) != len(text_parts):
logger.error(
"request_id=%s length mismatch: sent=%d got=%d",
request_id,
len(text_parts),
len(redacted),
)
raise HTTPException(
status_code=502,
detail=(
f"Peyeeye /v1/redact returned {len(redacted)} texts for "
f"{len(text_parts)} inputs; refusing to forward partially-"
"redacted data"
),
)
for (path_tuple, _), value in zip(text_parts, redacted):
set_request_text(body, path_tuple, value)
if session_id:
cache.set(request_id, session_id)
logger.info(
"request_id=%s redacted %d chunk(s); cached session",
request_id,
len(text_parts),
)
else:
logger.info(
"request_id=%s redacted %d chunk(s); no session id returned",
request_id,
len(text_parts),
)
return JSONResponse(content=body)
@app.post("/rehydrate/{path:path}")
async def rehydrate(path: str, request: Request) -> Response:
endpoint = f"/{path}"
request_id = request.headers.get("x-request-id", "unknown")
raw = await request.body()
session_id = cache.pop(request_id)
if not session_id:
logger.info(
"request_id=%s no session cached, passing response through",
request_id,
)
return Response(content=raw, media_type="application/json")
# Streaming SSE: not supported for stateful rehydration in this demo.
# Plano sends raw chunks, but rehydration needs the full token to look
# up the original; we pass through and rely on a non-streaming flow.
body_str = raw.decode("utf-8", errors="replace")
if body_str.lstrip().startswith("data:") or "data: " in body_str[:32]:
logger.warning(
"request_id=%s SSE not supported; passing through "
"(use non-streaming for now)",
request_id,
)
# Don't lose the session id on the way back if SSE.
cache.set(request_id, session_id)
return Response(content=raw, media_type="text/event-stream")
try:
body = json.loads(body_str)
except json.JSONDecodeError:
logger.warning(
"request_id=%s response is not JSON; passing through", request_id
)
return Response(content=raw, media_type="application/json")
text_parts = iter_response_texts(body, endpoint)
if not text_parts:
logger.info("request_id=%s no response text to rehydrate", request_id)
await _maybe_delete_session(_client(), session_id)
return JSONResponse(content=body)
# Run rehydrate calls concurrently — each returns the original text,
# falling back to the redacted value on rehydrate error.
tasks = [_client().rehydrate(text, session_id) for _, text in text_parts]
restored = await asyncio.gather(*tasks)
for (path_tuple, _), value in zip(text_parts, restored):
set_request_text(body, path_tuple, value)
await _maybe_delete_session(_client(), session_id)
logger.info("request_id=%s rehydrated %d chunk(s)", request_id, len(text_parts))
return JSONResponse(content=body)
@app.get("/health")
async def health() -> Dict[str, str]:
return {"status": "healthy"}
return app
async def _maybe_delete_session(client: PEyeEyeClient, session_id: str) -> None:
"""Best-effort delete of a stateful session id."""
if client.session_mode == "stateful":
try:
await client.delete_session(session_id)
except Exception: # pragma: no cover - best effort
pass
# Default ASGI app — used by ``uvicorn peyeeye:app``.
# Lazily resolves the client so tests can construct ``create_app(client=...)``
# without requiring PEYEEYE_API_KEY in the environment.
app = create_app()

View file

@ -0,0 +1,29 @@
[project]
name = "peyeeye_filter"
version = "0.1.0"
description = "Peyeeye PII redaction & rehydration filter for Plano filter chains"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"fastapi>=0.104.1",
"uvicorn>=0.24.0",
"httpx>=0.27.0",
"pydantic>=2.0.0",
]
[dependency-groups]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"respx>=0.21.0",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["."]

View file

@ -0,0 +1,44 @@
#!/bin/bash
set -e
start_demo() {
if [ -f ".env" ]; then
echo ".env file already exists. Skipping creation."
else
if [ -z "${PEYEEYE_API_KEY:-}" ]; then
echo "Error: PEYEEYE_API_KEY environment variable is not set for the demo."
exit 1
fi
if [ -z "${OPENAI_API_KEY:-}" ]; then
echo "Error: OPENAI_API_KEY environment variable is not set for the demo."
exit 1
fi
echo "Creating .env file..."
{
echo "PEYEEYE_API_KEY=$PEYEEYE_API_KEY"
echo "OPENAI_API_KEY=$OPENAI_API_KEY"
} > .env
echo ".env file created."
fi
echo "Starting Plano with config.yaml..."
planoai up config.yaml
echo "Starting Peyeeye filter service..."
bash start_agents.sh &
}
stop_demo() {
echo "Stopping Peyeeye filter service..."
pkill -f start_agents.sh 2>/dev/null || true
echo "Stopping Plano..."
planoai down
}
if [ "$1" == "down" ]; then
stop_demo
else
start_demo
fi

View file

@ -0,0 +1,29 @@
#!/bin/bash
set -e
PIDS=()
log() { echo "$(date '+%F %T') - $*"; }
cleanup() {
log "Stopping agents..."
for PID in "${PIDS[@]}"; do
kill $PID 2>/dev/null && log "Stopped process $PID"
done
exit 0
}
trap cleanup EXIT INT TERM
if [ -z "${PEYEEYE_API_KEY:-}" ]; then
log "ERROR: PEYEEYE_API_KEY is not set."
exit 1
fi
log "Starting Peyeeye filter service on port 10502..."
uv run uvicorn peyeeye:app --host 0.0.0.0 --port 10502 &
PIDS+=($!)
for PID in "${PIDS[@]}"; do
wait "$PID"
done

View file

@ -0,0 +1,71 @@
#!/usr/bin/env bash
set -euo pipefail
BASE_URL="http://localhost:12000"
PASS=0
FAIL=0
echo "Waiting for Plano to be ready..."
for i in $(seq 1 30); do
if curl -sf "$BASE_URL/v1/models" > /dev/null 2>&1; then
echo "Plano is ready."
break
fi
if [ "$i" -eq 30 ]; then
echo "ERROR: Plano did not become ready in time."
exit 1
fi
sleep 2
done
run_test() {
local name="$1"
local path="$2"
local expected_code="$3"
local body="$4"
http_code=$(curl -s -o /tmp/peyeeye_test_body -w "%{http_code}" \
-X POST "$BASE_URL$path" \
-H "Content-Type: application/json" \
-d "$body")
if [ "$http_code" -eq "$expected_code" ]; then
echo " PASS $name (HTTP $http_code)"
PASS=$((PASS + 1))
else
echo " FAIL $name -- expected $expected_code, got $http_code"
echo " Body: $(cat /tmp/peyeeye_test_body)"
FAIL=$((FAIL + 1))
fi
}
echo ""
echo "=== /v1/chat/completions ==="
run_test "Non-streaming with PII" /v1/chat/completions 200 '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Email me at jane@example.com"}],
"stream": false
}'
run_test "No PII" /v1/chat/completions 200 '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "What is 2+2?"}],
"stream": false
}'
echo ""
echo "=== /v1/messages (Anthropic) ==="
run_test "Non-streaming with PII (SSN)" /v1/messages 200 '{
"model": "claude-sonnet-4-20250514",
"max_tokens": 256,
"messages": [{"role": "user", "content": "My SSN is 123-45-6789"}]
}'
echo ""
echo "Results: $PASS passed, $FAIL failed"
if [ "$FAIL" -gt 0 ]; then
exit 1
fi

View file

@ -0,0 +1,471 @@
"""Tests for the Peyeeye Plano filter.
The Peyeeye API (`https://api.peyeeye.ai`) is fully mocked with `respx`. Every
new branch in `peyeeye.py` should have at least one test below.
"""
from __future__ import annotations
import json
from typing import Any, Dict
import httpx
import pytest
import respx
from fastapi.testclient import TestClient
from peyeeye import (
PEyeEyeClient,
PEyeEyeMissingSecrets,
create_app,
iter_request_texts,
iter_response_texts,
set_request_text,
)
# ------------------------------------------------------------------- fixtures
@pytest.fixture
def api_base() -> str:
return "https://api.peyeeye.ai"
@pytest.fixture
def client(monkeypatch, api_base) -> PEyeEyeClient:
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
monkeypatch.delenv("PEYEEYE_API_BASE", raising=False)
monkeypatch.delenv("PEYEEYE_LOCALE", raising=False)
monkeypatch.delenv("PEYEEYE_ENTITIES", raising=False)
monkeypatch.delenv("PEYEEYE_SESSION_MODE", raising=False)
return PEyeEyeClient()
@pytest.fixture
def app_client(client) -> TestClient:
return TestClient(create_app(client=client))
# --------------------------------------------------------------- client tests
def test_client_missing_api_key(monkeypatch):
monkeypatch.delenv("PEYEEYE_API_KEY", raising=False)
with pytest.raises(PEyeEyeMissingSecrets):
PEyeEyeClient()
def test_client_invalid_session_mode(monkeypatch):
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
with pytest.raises(ValueError):
PEyeEyeClient(session_mode="bogus")
def test_client_picks_up_env_entities(monkeypatch):
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
monkeypatch.setenv("PEYEEYE_ENTITIES", "EMAIL, SSN ,CREDIT_CARD")
c = PEyeEyeClient()
assert c.entities == ["EMAIL", "SSN", "CREDIT_CARD"]
# -------------------------------------------------------------- iter helpers
def test_iter_request_texts_chat_string():
body = {"messages": [{"role": "user", "content": "hello jane@example.com"}]}
parts = iter_request_texts(body, "/v1/chat/completions")
assert parts == [(("messages", 0, "content"), "hello jane@example.com")]
def test_iter_request_texts_chat_multimodal():
body = {
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "hi"},
{"type": "image_url", "image_url": {"url": "..."}},
{"type": "text", "text": "ssn 111-22-3333"},
],
}
]
}
parts = iter_request_texts(body, "/v1/chat/completions")
assert parts == [
(("messages", 0, "content", 0, "text"), "hi"),
(("messages", 0, "content", 2, "text"), "ssn 111-22-3333"),
]
def test_iter_request_texts_skips_non_user_roles():
body = {
"messages": [
{"role": "system", "content": "you are helpful"},
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
}
parts = iter_request_texts(body, "/v1/chat/completions")
assert len(parts) == 1
assert parts[0][1] == "hi"
def test_iter_request_texts_responses_string():
body = {"input": "hello jane@example.com"}
parts = iter_request_texts(body, "/v1/responses")
assert parts == [(("input",), "hello jane@example.com")]
def test_iter_request_texts_responses_list():
body = {
"input": [
{"role": "system", "content": "ignored"},
{"role": "user", "content": "hi"},
{
"role": "user",
"content": [{"type": "text", "text": "ssn 111-22-3333"}],
},
]
}
parts = iter_request_texts(body, "/v1/responses")
assert parts == [
(("input", 1, "content"), "hi"),
(("input", 2, "content", 0, "text"), "ssn 111-22-3333"),
]
def test_set_request_text_round_trip():
body: Dict[str, Any] = {"messages": [{"role": "user", "content": "raw"}]}
set_request_text(body, ("messages", 0, "content"), "redacted")
assert body["messages"][0]["content"] == "redacted"
def test_iter_response_texts_chat():
body = {"choices": [{"message": {"content": "Email [EMAIL_0] back."}}]}
parts = iter_response_texts(body, "/v1/chat/completions")
assert parts == [(("choices", 0, "message", "content"), "Email [EMAIL_0] back.")]
def test_iter_response_texts_anthropic():
body = {"content": [{"type": "text", "text": "Reach [EMAIL_0]"}]}
parts = iter_response_texts(body, "/v1/messages")
assert parts == [(("content", 0, "text"), "Reach [EMAIL_0]")]
# ------------------------------------------------------------ /redact endpoint
@respx.mock
def test_redact_chat_happy_path(app_client, api_base):
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={
"text": ["hello [EMAIL_0]"],
"session_id": "ses_abc",
},
)
)
resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "hello jane@example.com"}],
},
headers={"x-request-id": "req-1"},
)
assert resp.status_code == 200
body = resp.json()
assert body["messages"][0]["content"] == "hello [EMAIL_0]"
@respx.mock
def test_redact_no_pii_no_session_cached(app_client, api_base):
# Even with no text-bearing chunks, the body is returned untouched.
resp = app_client.post(
"/redact/v1/chat/completions",
json={"model": "gpt-4o-mini", "messages": []},
headers={"x-request-id": "req-empty"},
)
assert resp.status_code == 200
# No call should have been made to peyeeye.
assert not respx.routes
@respx.mock
def test_redact_length_guard_fails_closed(app_client, api_base):
"""If the API returns a different number of texts, refuse to forward."""
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={"text": ["only one"], "session_id": "ses_x"},
)
)
resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [
{"role": "user", "content": "first jane@example.com"},
{"role": "user", "content": "second 555-123-4567"},
],
},
headers={"x-request-id": "req-bad-len"},
)
assert resp.status_code == 502
assert "refusing to forward" in resp.json()["detail"]
@respx.mock
def test_redact_unexpected_shape_fails_closed(app_client, api_base):
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(200, json={"text": 42, "session_id": "ses_x"})
)
resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "x"}],
},
headers={"x-request-id": "req-bad-shape"},
)
assert resp.status_code == 502
assert "unexpected response shape" in resp.json()["detail"]
@respx.mock
def test_redact_5xx_fails_closed(app_client, api_base):
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(500, text="boom")
)
resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "x"}],
},
headers={"x-request-id": "req-5xx"},
)
assert resp.status_code == 502
assert "HTTP 500" in resp.json()["detail"]
@respx.mock
def test_redact_401_fails_closed_with_500(app_client, api_base):
"""Auth errors surface as PEyeEyeMissingSecrets -> HTTP 500."""
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(401, json={"error": "bad key"})
)
resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "x"}],
},
headers={"x-request-id": "req-401"},
)
assert resp.status_code == 500
assert "Invalid Peyeeye API key" in resp.json()["detail"]
@respx.mock
def test_redact_invalid_json_body(app_client):
resp = app_client.post(
"/redact/v1/chat/completions",
content=b"not json",
headers={
"Content-Type": "application/json",
"x-request-id": "req-bad-json",
},
)
assert resp.status_code == 400
# --------------------------------------------------------- /rehydrate endpoint
@respx.mock
def test_rehydrate_round_trip(app_client, api_base):
"""Redact then rehydrate share the request id; placeholders restored."""
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={
"text": ["hello [EMAIL_0]"],
"session_id": "ses_round",
},
)
)
respx.post(f"{api_base}/v1/rehydrate").mock(
return_value=httpx.Response(
200, json={"text": "Reach jane@example.com today.", "replaced": 1}
)
)
respx.delete(f"{api_base}/v1/sessions/ses_round").mock(
return_value=httpx.Response(204)
)
redact_resp = app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "hello jane@example.com"}],
},
headers={"x-request-id": "req-round"},
)
assert redact_resp.status_code == 200
rehydrate_resp = app_client.post(
"/rehydrate/v1/chat/completions",
json={"choices": [{"message": {"content": "Reach [EMAIL_0] today."}}]},
headers={"x-request-id": "req-round"},
)
assert rehydrate_resp.status_code == 200
body = rehydrate_resp.json()
assert body["choices"][0]["message"]["content"] == "Reach jane@example.com today."
@respx.mock
def test_rehydrate_no_session_cached_passthrough(app_client):
raw = {"choices": [{"message": {"content": "no session here"}}]}
resp = app_client.post(
"/rehydrate/v1/chat/completions",
json=raw,
headers={"x-request-id": "req-uncached"},
)
assert resp.status_code == 200
assert resp.json() == raw
@respx.mock
def test_rehydrate_anthropic_messages(app_client, api_base):
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={"text": ["My ssn is [SSN_0]"], "session_id": "ses_anth"},
)
)
respx.post(f"{api_base}/v1/rehydrate").mock(
return_value=httpx.Response(
200, json={"text": "My ssn is 111-22-3333", "replaced": 1}
)
)
respx.delete(f"{api_base}/v1/sessions/ses_anth").mock(
return_value=httpx.Response(204)
)
app_client.post(
"/redact/v1/messages",
json={
"model": "claude-sonnet-4",
"messages": [{"role": "user", "content": "My ssn is 111-22-3333"}],
},
headers={"x-request-id": "req-anth"},
)
resp = app_client.post(
"/rehydrate/v1/messages",
json={"content": [{"type": "text", "text": "My ssn is [SSN_0]"}]},
headers={"x-request-id": "req-anth"},
)
assert resp.status_code == 200
body = resp.json()
assert body["content"][0]["text"] == "My ssn is 111-22-3333"
@respx.mock
def test_rehydrate_sse_passthrough(app_client, api_base):
"""SSE bodies are passed through unchanged (and the session id is kept)."""
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200, json={"text": ["[EMAIL_0]"], "session_id": "ses_sse"}
)
)
app_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "jane@example.com"}],
},
headers={"x-request-id": "req-sse"},
)
sse_body = b'data: {"choices":[{"delta":{"content":"hi"}}]}\n\n'
resp = app_client.post(
"/rehydrate/v1/chat/completions",
content=sse_body,
headers={
"Content-Type": "text/event-stream",
"x-request-id": "req-sse",
},
)
assert resp.status_code == 200
assert resp.content == sse_body
@respx.mock
def test_redact_responses_input_string(app_client, api_base):
respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={
"text": ["My email is [EMAIL_0]"],
"session_id": "ses_resp",
},
)
)
resp = app_client.post(
"/redact/v1/responses",
json={"model": "gpt-4o-mini", "input": "My email is jane@example.com"},
headers={"x-request-id": "req-resp"},
)
assert resp.status_code == 200
assert resp.json()["input"] == "My email is [EMAIL_0]"
@respx.mock
def test_stateless_session_mode(monkeypatch, api_base):
"""Stateless mode caches the rehydration_key and skips DELETE."""
monkeypatch.setenv("PEYEEYE_API_KEY", "pk_test_123")
monkeypatch.setenv("PEYEEYE_SESSION_MODE", "stateless")
client = PEyeEyeClient()
test_client = TestClient(create_app(client=client))
redact_route = respx.post(f"{api_base}/v1/redact").mock(
return_value=httpx.Response(
200,
json={
"text": ["hello [EMAIL_0]"],
"rehydration_key": "skey_xyz",
},
)
)
rehydrate_route = respx.post(f"{api_base}/v1/rehydrate").mock(
return_value=httpx.Response(
200, json={"text": "hello jane@example.com", "replaced": 1}
)
)
delete_route = respx.delete(f"{api_base}/v1/sessions/skey_xyz")
test_client.post(
"/redact/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "hello jane@example.com"}],
},
headers={"x-request-id": "req-stateless"},
)
# Inspect the redact request body to confirm session=stateless was sent.
assert redact_route.called
sent_body = json.loads(redact_route.calls.last.request.content)
assert sent_body["session"] == "stateless"
resp = test_client.post(
"/rehydrate/v1/chat/completions",
json={"choices": [{"message": {"content": "hello [EMAIL_0]"}}]},
headers={"x-request-id": "req-stateless"},
)
assert resp.status_code == 200
assert resp.json()["choices"][0]["message"]["content"] == "hello jane@example.com"
# Stateless: no DELETE call.
assert not delete_route.called
assert rehydrate_route.called