From eb7d7be76ce8475faca4b0e092f94625c617677a Mon Sep 17 00:00:00 2001 From: Ahmed Burney Date: Fri, 16 Jan 2026 22:55:13 +0500 Subject: [PATCH] changing pii to http --- .../src/credit_risk_demo/pii_filter.py | 44 ++++++++----------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/demos/use_cases/credit_risk_case_copilot/src/credit_risk_demo/pii_filter.py b/demos/use_cases/credit_risk_case_copilot/src/credit_risk_demo/pii_filter.py index 0fda08c9..797dbd12 100644 --- a/demos/use_cases/credit_risk_case_copilot/src/credit_risk_demo/pii_filter.py +++ b/demos/use_cases/credit_risk_case_copilot/src/credit_risk_demo/pii_filter.py @@ -1,7 +1,7 @@ import json import logging import re -from typing import Any, Dict +from typing import Any, Dict, List, Optional, Union import uvicorn from fastapi import FastAPI, Request @@ -14,7 +14,7 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) -app = FastAPI(title="PII Security Filter (MCP)", version="1.0.0") +app = FastAPI(title="PII Security Filter", version="1.0.0") # PII patterns CNIC_PATTERN = re.compile(r"\b\d{5}-\d{7}-\d{1}\b") @@ -32,9 +32,9 @@ INJECTION_PATTERNS = [ ] -class MCPRequest(BaseModel): - messages: list - model: str = None +class PiiFilterRequest(BaseModel): + messages: List[Dict[str, Any]] + model: Optional[str] = None def redact_pii(text: str) -> tuple[str, list]: @@ -78,50 +78,42 @@ def detect_injection(text: str) -> tuple[bool, list]: @app.post("/v1/tools/pii_security_filter") -async def pii_security_filter(request: MCPRequest): - """MCP filter endpoint for PII redaction and injection detection.""" +async def pii_security_filter(request: Union[PiiFilterRequest, List[Dict[str, Any]]]): try: - messages = request.messages + if isinstance(request, list): + messages = request + else: + messages = request.messages + security_events = [] - # Process each message for msg in messages: if msg.get("role") == "user": content = msg.get("content", "") - # PII redaction redacted_content, pii_findings = redact_pii(content) if pii_findings: security_events.extend(pii_findings) msg["content"] = redacted_content logger.warning(f"PII redacted: {pii_findings}") - # Injection detection is_injection, patterns = detect_injection(content) if is_injection: security_event = f"Prompt injection detected: {patterns}" security_events.append(security_event) logger.warning(security_event) - - # Add warning to content msg["content"] = ( f"[SECURITY WARNING: Potential prompt injection detected]\n\n{msg['content']}" ) - # Return filtered messages - response = { - "messages": messages, - "metadata": { - "security_events": security_events, - "pii_redacted": len([e for e in security_events if "found" in e]) > 0, - "injection_detected": len( - [e for e in security_events if "injection" in e.lower()] - ) - > 0, - }, - } + # Optional: log metadata server-side (but don't return it to Plano) + logger.info( + f"Filter events: {security_events} | pii_redacted={any('found' in e for e in security_events)} " + f"| injection_detected={any('injection' in e.lower() for e in security_events)}" + ) - return JSONResponse(content=response) + # IMPORTANT: return only the messages list (JSON array) + return JSONResponse(content=messages) except Exception as e: logger.error(f"Filter error: {e}", exc_info=True)