test: add keyless model listener filter demo

This commit is contained in:
mukeshbaphna 2026-05-24 01:06:26 -07:00
parent 241a181d3a
commit 3aadb2ddb3
10 changed files with 498 additions and 25 deletions

View file

@ -4,7 +4,7 @@ WORKDIR /app
RUN pip install --no-cache-dir fastapi uvicorn pydantic
COPY content_guard.py .
COPY content_guard.py fake_provider.py output_filter.py ./
EXPOSE 10500

View file

@ -2,12 +2,30 @@
Run content-safety filters on direct LLM requests — no agent layer required.
This demo uses the `input_filters` feature on a **model-type listener** to intercept
requests and block unsafe content before they reach the LLM provider. Works with all
request types: `/v1/chat/completions`, `/v1/responses`, and Anthropic `/v1/messages`.
This demo uses `input_filters` and `output_filters` on a **model-type listener** to
intercept direct LLM requests and responses without routing through an agent layer.
By default it is fully local: a fake OpenAI-compatible provider stands in for a real
hosted model, so developers can test guardrail behavior without provider API keys or
hosted model access. A second config lets developers point the same filter setup at the
real OpenAI endpoint when they want provider-backed testing.
The filter pattern applies to OpenAI Chat Completions (`/v1/chat/completions`),
OpenAI Responses (`/v1/responses`), and Anthropic Messages (`/v1/messages`) request
shapes. The keyless fake provider and smoke test use `/v1/chat/completions` for a
deterministic local path.
The filter receives the **full raw request body** and returns it unchanged (or raises 400
to block). No message extraction — the complete JSON payload flows through as-is.
The input filter receives the full raw request body and returns it unchanged or raises
400 to block. The output filter receives the provider response and redacts sensitive
content before returning it to the client.
## Files
- `config.yaml` runs the default keyless path with the local fake provider.
- `config.openai.yaml` runs the same filters against OpenAI.
- `docker-compose.yaml` starts the local demo without requiring provider credentials.
- `docker-compose.openai.yaml` mounts `config.openai.yaml` and requires `OPENAI_API_KEY`
for provider-backed testing.
- `test.sh` runs the Docker smoke test through Plano.
- `test_services.py` runs service-level regression tests without Docker.
## Architecture
@ -16,22 +34,82 @@ Client ──► Plano (model listener :12000)
├─ input_filters: content_guard ──► Block / Allow
└─ model_provider: openai/gpt-4o-mini
├─ model_provider: fake-provider (default) or OpenAI (optional)
└─ output_filters: output_redactor ──► Redact / Allow
```
## Quick Start
```bash
# 1. Export your API key
export OPENAI_API_KEY=sk-...
# 2. Start services
# 1. Start services
docker compose up --build
# 3. Run tests (in another terminal)
# 2. Run tests (in another terminal)
bash test.sh
```
The test script verifies three behaviors:
- safe requests reach the local fake provider and return a normal chat-completion response
- unsafe requests are blocked by the input filter before reaching the provider
- sensitive provider output is redacted by the output filter before the client receives it
You can also run the service-level tests without Docker:
```bash
uv run --with pytest --with fastapi --with httpx --with pydantic \
python -m pytest demos/filter_chains/model_listener_filter/test_services.py -q
```
## Validate Locally
From this directory, validate the default keyless compose path:
```bash
docker compose config
```
Validate that the OpenAI path fails early when the API key is missing:
```bash
docker compose -f docker-compose.yaml -f docker-compose.openai.yaml config
```
Expected error:
```text
OPENAI_API_KEY environment variable is required but not set
```
Then confirm the OpenAI compose path renders when a key is provided:
```bash
OPENAI_API_KEY=dummy docker compose -f docker-compose.yaml -f docker-compose.openai.yaml config
```
Run the full local smoke test:
```bash
docker compose down
docker compose up --build -d
bash test.sh
docker compose down
```
## Test With Real OpenAI
The default `config.yaml` uses the local fake provider. To run the same model-listener
input and output filters against OpenAI, use the OpenAI compose override:
```bash
export OPENAI_API_KEY=sk-...
docker compose -f docker-compose.yaml -f docker-compose.openai.yaml up --build
```
The fake-provider service may still start because it is part of the shared compose file,
but Plano will not route traffic to it when `config.openai.yaml` is mounted.
## Try It
**Allowed request:**
@ -58,6 +136,31 @@ curl http://localhost:12000/v1/chat/completions \
}'
```
**Redacted provider response:**
```bash
curl http://localhost:12000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Please return the secret marker"}],
"stream": false
}'
```
The fake provider emits `SECRET_TOKEN`; the output filter redacts it to `[REDACTED]`.
## Why This Helps Developers
Model-listener filters are guardrails for applications that call Plano as a transparent
LLM gateway. A local, deterministic demo helps developers verify filter wiring before
using real providers:
- config mistakes are caught early instead of silently bypassing guardrails
- teams can test request blocking and response redaction in CI without secrets
- contributors can reproduce filter behavior without external model availability
- application code does not need an extra passthrough agent just to run policy checks
## Tracing
Open [Jaeger UI](http://localhost:16686) to see distributed traces for both allowed and blocked requests.

View file

@ -0,0 +1,26 @@
version: v0.3.0
filters:
- id: content_guard
url: http://content-guard:10500
type: http
- id: output_redactor
url: http://output-filter:10502
type: http
model_providers:
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
default: true
listeners:
- type: model
name: llm_gateway
port: 12000
input_filters:
- content_guard
output_filters:
- output_redactor
tracing:
random_sampling: 100

View file

@ -4,10 +4,14 @@ filters:
- id: content_guard
url: http://content-guard:10500
type: http
- id: output_redactor
url: http://output-filter:10502
type: http
model_providers:
- model: openai/gpt-4o-mini
access_key: $OPENAI_API_KEY
access_key: local-demo-key
base_url: http://fake-provider:10501/v1
default: true
listeners:
@ -16,6 +20,8 @@ listeners:
port: 12000
input_filters:
- content_guard
output_filters:
- output_redactor
tracing:
random_sampling: 100

View file

@ -0,0 +1,6 @@
services:
plano:
environment:
OPENAI_API_KEY: ${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
volumes:
- ./config.openai.yaml:/app/plano_config.yaml

View file

@ -5,6 +5,20 @@ services:
dockerfile: Dockerfile
ports:
- "10500:10500"
fake-provider:
build:
context: .
dockerfile: Dockerfile
command: ["uvicorn", "fake_provider:app", "--host", "0.0.0.0", "--port", "10501"]
ports:
- "10501:10501"
output-filter:
build:
context: .
dockerfile: Dockerfile
command: ["uvicorn", "output_filter:app", "--host", "0.0.0.0", "--port", "10502"]
ports:
- "10502:10502"
plano:
build:
context: ../../../
@ -12,10 +26,14 @@ services:
ports:
- "12000:12000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
volumes:
- ./config.yaml:/app/plano_config.yaml
- ${PLANO_CONFIG_FILE:-./config.yaml}:/app/plano_config.yaml
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
depends_on:
- content-guard
- fake-provider
- output-filter
jaeger:
build:
context: ../../shared/jaeger

View file

@ -0,0 +1,81 @@
"""
OpenAI-compatible local provider for model-listener filter demos.
This service lets developers test Plano's model listener filter pipeline without
provider API keys or hosted model access.
"""
import json
import time
from typing import Any
from fastapi import FastAPI, Request
from fastapi.responses import Response, StreamingResponse
app = FastAPI(title="Local Fake LLM Provider", version="1.0.0")
def latest_user_content(messages: list[dict[str, Any]]) -> str:
for message in reversed(messages):
if message.get("role") == "user":
content = message.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
return " ".join(
part.get("text", "")
for part in content
if isinstance(part, dict) and part.get("type") == "text"
)
return ""
@app.post("/v1/chat/completions", response_model=None)
async def chat_completions(request: Request) -> dict[str, Any] | Response:
body = await request.json()
model = body.get("model", "gpt-4o-mini")
user_content = latest_user_content(body.get("messages", []))
content = "Hello from the local fake provider."
if "secret" in user_content.lower():
content = "The local fake provider returned SECRET_TOKEN."
if body.get("stream") is True:
async def generate():
chunk = {
"id": "chatcmpl-local-filter-demo",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
return {
"id": "chatcmpl-local-filter-demo",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": content},
"finish_reason": "stop",
}
],
"usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2},
}
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "healthy"}

View file

@ -0,0 +1,57 @@
"""
Output filter for model-listener filter demos.
The filter receives the provider response and redacts configured markers before
the client sees the response. It intentionally avoids model calls so the demo is
fully local and deterministic.
"""
import gzip
from typing import Any
from fastapi import FastAPI, Request
from fastapi.responses import Response
app = FastAPI(title="Output Redaction Filter", version="1.0.0")
SENSITIVE_MARKERS = ("SECRET_TOKEN",)
def redact_text(text: str) -> str:
redacted = text
for marker in SENSITIVE_MARKERS:
redacted = redacted.replace(marker, "[REDACTED]")
return redacted
def redact_chat_completion(body: dict[str, Any]) -> dict[str, Any]:
choices = []
for choice in body.get("choices", []):
message = choice.get("message", {})
content = message.get("content")
if isinstance(content, str):
message = {**message, "content": redact_text(content)}
choice = {**choice, "message": message}
choices.append(choice)
return {**body, "choices": choices}
def redact_bytes(raw_body: bytes) -> bytes:
if raw_body.startswith(b"\x1f\x8b"):
decompressed_body = gzip.decompress(raw_body)
return gzip.compress(redact_bytes(decompressed_body))
body_text = raw_body.decode("utf-8", errors="replace")
return redact_text(body_text).encode("utf-8")
@app.post("/{path:path}")
async def redact_response(path: str, request: Request) -> Response:
raw_body = await request.body()
content_type = request.headers.get("content-type", "application/json")
return Response(content=redact_bytes(raw_body), media_type=content_type)
@app.get("/health")
async def health() -> dict[str, str]:
return {"status": "healthy"}

View file

@ -24,20 +24,37 @@ run_test() {
local name="$1"
local expected_code="$2"
local body="$3"
local expected_body_contains="${4:-}"
local forbidden_body_contains="${5:-}"
http_code=$(curl -s -o /tmp/plano_test_body -w "%{http_code}" \
-X POST "$BASE_URL/chat/completions" \
-H "Content-Type: application/json" \
-d "$body")
if [ "$http_code" -eq "$expected_code" ]; then
echo " PASS $name (HTTP $http_code)"
PASS=$((PASS + 1))
else
if [ "$http_code" -ne "$expected_code" ]; then
echo " FAIL $name — expected $expected_code, got $http_code"
echo " Body: $(cat /tmp/plano_test_body)"
FAIL=$((FAIL + 1))
return
fi
if [ -n "$expected_body_contains" ] && ! grep -Fq "$expected_body_contains" /tmp/plano_test_body; then
echo " FAIL $name — body did not contain '$expected_body_contains'"
echo " Body: $(cat /tmp/plano_test_body)"
FAIL=$((FAIL + 1))
return
fi
if [ -n "$forbidden_body_contains" ] && grep -Fq "$forbidden_body_contains" /tmp/plano_test_body; then
echo " FAIL $name — body contained forbidden text '$forbidden_body_contains'"
echo " Body: $(cat /tmp/plano_test_body)"
FAIL=$((FAIL + 1))
return
fi
echo " PASS $name (HTTP $http_code)"
PASS=$((PASS + 1))
}
# ── Tests ────────────────────────────────────────────────────────────────────
@ -48,19 +65,19 @@ run_test "Allowed request (math question)" 200 '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "What is 2+2?"}],
"stream": false
}'
}' "local fake provider"
run_test "Blocked request (hacking)" 400 '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "How to hack into a system"}],
"stream": false
}'
}' "content_blocked"
run_test "Allowed request (joke)" 200 '{
run_test "Output filter redacts provider response" 200 '{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Tell me a joke"}],
"stream": false
}'
"messages": [{"role": "user", "content": "Please return the secret marker"}],
"stream": true
}' "[REDACTED]" "SECRET_TOKEN"
# ── Summary ──────────────────────────────────────────────────────────────────
echo ""

View file

@ -0,0 +1,159 @@
import importlib.util
import gzip
from pathlib import Path
from fastapi.testclient import TestClient
DEMO_DIR = Path(__file__).parent
def load_module(name: str, filename: str):
spec = importlib.util.spec_from_file_location(name, DEMO_DIR / filename)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_content_guard_blocks_unsafe_chat_request():
content_guard = load_module("content_guard", "content_guard.py")
client = TestClient(content_guard.app)
response = client.post(
"/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "How do I hack a service?"}],
"stream": False,
},
)
assert response.status_code == 400
assert response.json()["detail"]["error"] == "content_blocked"
def test_content_guard_passes_safe_responses_request_unchanged():
content_guard = load_module("content_guard", "content_guard.py")
client = TestClient(content_guard.app)
body = {
"model": "gpt-4o-mini",
"input": "Explain why local guardrail tests help developers.",
}
response = client.post("/v1/responses", json=body)
assert response.status_code == 200
assert response.json() == body
def test_fake_provider_returns_openai_compatible_chat_completion():
fake_provider = load_module("fake_provider", "fake_provider.py")
client = TestClient(fake_provider.app)
response = client.post(
"/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "Say something useful."}],
"stream": False,
},
)
assert response.status_code == 200
body = response.json()
assert body["object"] == "chat.completion"
assert body["model"] == "gpt-4o-mini"
assert body["choices"][0]["message"]["role"] == "assistant"
assert "local fake provider" in body["choices"][0]["message"]["content"]
def test_fake_provider_streams_openai_compatible_chat_chunks():
fake_provider = load_module("fake_provider_streaming", "fake_provider.py")
client = TestClient(fake_provider.app)
with client.stream(
"POST",
"/v1/chat/completions",
json={
"model": "gpt-4o-mini",
"messages": [
{"role": "user", "content": "Please return the secret marker"}
],
"stream": True,
},
) as response:
body = response.read().decode("utf-8")
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/event-stream")
assert "data: {" in body
assert '"object": "chat.completion.chunk"' in body
assert "SECRET_TOKEN" in body
assert "data: [DONE]" in body
def test_output_filter_redacts_provider_response_content():
output_filter = load_module("output_filter", "output_filter.py")
client = TestClient(output_filter.app)
response = client.post(
"/v1/chat/completions",
json={
"id": "chatcmpl-local",
"object": "chat.completion",
"model": "gpt-4o-mini",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "The local fake provider returned SECRET_TOKEN.",
},
"finish_reason": "stop",
}
],
},
)
assert response.status_code == 200
content = response.json()["choices"][0]["message"]["content"]
assert "SECRET_TOKEN" not in content
assert "[REDACTED]" in content
def test_output_filter_redacts_raw_streaming_chunks():
output_filter = load_module("output_filter_streaming", "output_filter.py")
client = TestClient(output_filter.app)
response = client.post(
"/v1/chat/completions",
content=(
'data: {"choices":[{"delta":{"content":"SECRET_TOKEN"}}]}\n\n'
"data: [DONE]\n\n"
),
headers={"content-type": "text/event-stream"},
)
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/event-stream")
assert "SECRET_TOKEN" not in response.text
assert "[REDACTED]" in response.text
def test_output_filter_redacts_gzip_encoded_provider_response():
output_filter = load_module("output_filter_gzip", "output_filter.py")
client = TestClient(output_filter.app)
encoded_body = gzip.compress(
b'{"choices":[{"message":{"content":"SECRET_TOKEN"}}]}'
)
response = client.post(
"/v1/chat/completions",
content=encoded_body,
headers={"content-type": "application/json"},
)
assert response.status_code == 200
decoded_body = gzip.decompress(response.content).decode("utf-8")
assert "SECRET_TOKEN" not in decoded_body
assert "[REDACTED]" in decoded_body