changing pii to http

This commit is contained in:
Ahmed Burney 2026-01-16 22:55:13 +05:00
parent 1968db0840
commit eb7d7be76c

View file

@ -1,7 +1,7 @@
import json import json
import logging import logging
import re import re
from typing import Any, Dict from typing import Any, Dict, List, Optional, Union
import uvicorn import uvicorn
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
@ -14,7 +14,7 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
app = FastAPI(title="PII Security Filter (MCP)", version="1.0.0") app = FastAPI(title="PII Security Filter", version="1.0.0")
# PII patterns # PII patterns
CNIC_PATTERN = re.compile(r"\b\d{5}-\d{7}-\d{1}\b") CNIC_PATTERN = re.compile(r"\b\d{5}-\d{7}-\d{1}\b")
@ -32,9 +32,9 @@ INJECTION_PATTERNS = [
] ]
class MCPRequest(BaseModel): class PiiFilterRequest(BaseModel):
messages: list messages: List[Dict[str, Any]]
model: str = None model: Optional[str] = None
def redact_pii(text: str) -> tuple[str, list]: def redact_pii(text: str) -> tuple[str, list]:
@ -78,50 +78,42 @@ def detect_injection(text: str) -> tuple[bool, list]:
@app.post("/v1/tools/pii_security_filter") @app.post("/v1/tools/pii_security_filter")
async def pii_security_filter(request: MCPRequest): async def pii_security_filter(request: Union[PiiFilterRequest, List[Dict[str, Any]]]):
"""MCP filter endpoint for PII redaction and injection detection."""
try: try:
messages = request.messages if isinstance(request, list):
messages = request
else:
messages = request.messages
security_events = [] security_events = []
# Process each message
for msg in messages: for msg in messages:
if msg.get("role") == "user": if msg.get("role") == "user":
content = msg.get("content", "") content = msg.get("content", "")
# PII redaction
redacted_content, pii_findings = redact_pii(content) redacted_content, pii_findings = redact_pii(content)
if pii_findings: if pii_findings:
security_events.extend(pii_findings) security_events.extend(pii_findings)
msg["content"] = redacted_content msg["content"] = redacted_content
logger.warning(f"PII redacted: {pii_findings}") logger.warning(f"PII redacted: {pii_findings}")
# Injection detection
is_injection, patterns = detect_injection(content) is_injection, patterns = detect_injection(content)
if is_injection: if is_injection:
security_event = f"Prompt injection detected: {patterns}" security_event = f"Prompt injection detected: {patterns}"
security_events.append(security_event) security_events.append(security_event)
logger.warning(security_event) logger.warning(security_event)
# Add warning to content
msg["content"] = ( msg["content"] = (
f"[SECURITY WARNING: Potential prompt injection detected]\n\n{msg['content']}" f"[SECURITY WARNING: Potential prompt injection detected]\n\n{msg['content']}"
) )
# Return filtered messages # Optional: log metadata server-side (but don't return it to Plano)
response = { logger.info(
"messages": messages, f"Filter events: {security_events} | pii_redacted={any('found' in e for e in security_events)} "
"metadata": { f"| injection_detected={any('injection' in e.lower() for e in security_events)}"
"security_events": security_events, )
"pii_redacted": len([e for e in security_events if "found" in e]) > 0,
"injection_detected": len(
[e for e in security_events if "injection" in e.lower()]
)
> 0,
},
}
return JSONResponse(content=response) # IMPORTANT: return only the messages list (JSON array)
return JSONResponse(content=messages)
except Exception as e: except Exception as e:
logger.error(f"Filter error: {e}", exc_info=True) logger.error(f"Filter error: {e}", exc_info=True)