2 loop refactor

This commit is contained in:
Ahmed Burney 2026-02-21 07:33:47 +05:00
parent 334f902e4a
commit e58e6dae9b
4 changed files with 175 additions and 1164 deletions

View file

@ -1,575 +1,73 @@
# Credit Risk Case Copilot
This directory contains a demo **credit risk assessment system** built to exercise Plano's orchestration, guardrails, and observability features in a realistic setup.
The goal of this project is not to show "yet another agent demo", but to answer a more practical question:
> How would you actually deploy an agentic AI system in a regulated environment?
To do that, the system includes a real multi-agent workflow, a security boundary in front of it, structured downstream actions, and full tracing across all components.
A small demo that follows the two-loop model: Plano is the **outer loop** (routing, guardrails, tracing), and each credit-risk step is a focused **inner-loop agent**.
---
## Why this demo exists
## What runs
Most agent demos run everything inside the agent:
- Agents call models directly
- There's no security boundary
- Observability is minimal
- Downstream systems are mocked or ignored
That works for experimentation, but it doesn't resemble how these systems would be deployed in production.
This demo flips that around:
- **Plano sits in front** as the control plane
- Requests are sanitized before reaching agents
- Agents are treated as untrusted workloads
- Every LLM call is routed, logged, and traceable
- **Risk Crew Agent (10530)**: four OpenAI-compatible endpoints (intake, risk, policy, memo).
- **PII Filter (10550)**: redacts PII and flags prompt injection.
- **Streamlit UI (8501)**: single-call client.
- **Jaeger (16686)**: tracing backend.
---
## High-level architecture
## Quick start
```
User / Streamlit UI
Plano Orchestrator (8001)
├─ HTTP Security Filter (PII + injection)
├─ Agent routing
├─ Model routing
Risk Crew Agent (CrewAI)
Plano LLM Gateway (12000)
OpenAI
```
Plano is the only component allowed to talk to models or invoke downstream systems. Everything else goes through it.
---
## What the system actually does
At a high level, the system takes an unstructured loan request and turns it into a structured credit decision.
Internally, this is implemented as a small CrewAI workflow with four agents:
1. Intake & normalization (gpt-4o-mini)
2. Risk scoring & drivers (gpt-4o)
3. Policy & compliance checks (gpt-4o)
4. Decision memo synthesis (gpt-4o)
Each agent builds on the output of the previous one. The workflow is sequential on purpose to make traces easier to follow.
The specific agent framework isn't the focus here — it's mainly used as a realistic payload for Plano to orchestrate.
---
## Plano features exercised in this demo
This demo actively uses several Plano capabilities together:
### Agent listener
- OpenAI-compatible `/v1/chat/completions` endpoint
- Requests are routed to the appropriate agent based on configuration
- Agents remain unaware of routing logic
### HTTP filter chain (security guardrails)
- Requests pass through an HTTP-based security filter before agent execution
- PII (CNIC, phone numbers, emails) is redacted in-place
- Prompt injection attempts are detected and flagged
- The agent receives only sanitized input
The filter is implemented as a simple HTTP service to keep things easy to debug and reason about.
### Central LLM gateway
- All LLM calls go through Plano
- Agents never talk to OpenAI directly
- Makes tracing, policy enforcement, and provider switching easier later
### Observability
- End-to-end OpenTelemetry tracing
- One trace per request, spanning:
- Security filter
- Agent execution
- Individual LLM calls
- Downstream API calls
---
## Example request flow
```bash
curl http://localhost:8001/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "openai/gpt-4o",
"messages": [
{
"role": "user",
"content": "CNIC 12345-1234567-1 assess risk for loan amount 300000"
}
]
}'
```
What happens:
1. Plano receives the request
2. The PII filter redacts the CNIC
3. The sanitized request is routed to the risk agent
4. The multi-agent workflow executes
5. The response is returned
6. The entire flow appears as a single trace in Jaeger
---
## Services in this repo
### Risk Crew Agent (10530)
Implements the CrewAI workflow and exposes four standalone OpenAI-compatible endpoints so Plano can route to each step independently.
The agent is intentionally kept unaware of:
- Security filters
- Model providers
- Routing decisions
### PII Security Filter (10550)
A small FastAPI service that:
- Redacts CNIC, phone numbers, and emails
- Detects common prompt injection patterns
- Mutates messages in-place
- Returns only the updated message list (as expected by Plano's HTTP filter interface)
This runs before the agent is invoked.
### Streamlit UI (8501)
A lightweight UI for interacting with the system:
- Provides example scenarios
- Displays structured outputs
- Useful for demos and manual testing
### Jaeger (16686)
Used for distributed tracing.
All services emit OpenTelemetry spans.
---
## Observability notes
Open Jaeger at: **http://localhost:16686**
A typical trace shows:
- One parent request span
- A security filter span
- Four LLM call spans (one per agent step)
This is intentional — the trace tells the full story of what happened and why.
---
## Running the demo
### Prerequisites
- Docker + Docker Compose
- Plano CLI (`pip install planoai` or `uvx planoai`)
- OpenAI API key
### Environment setup
```bash
cp .env.example .env
# add OPENAI_API_KEY
```
### Start services
```bash
docker compose up --build
```
### Start Plano
In a separate terminal:
```bash
uvx planoai up config.yaml
```
Plano runs on:
- **8001** agent listener
- **12000** LLM gateway
### Access
- **Streamlit UI**: http://localhost:8501
- **Jaeger Traces**: http://localhost:16686
Open:
- Streamlit UI: http://localhost:8501
- Jaeger: http://localhost:16686
---
## Screenshots
## How it works
### Streamlit UI
The UI provides a simple interface for testing scenarios and viewing risk assessments:
1. The UI sends **one** request to Plano with the application JSON.
2. Plano routes the request across the four agents in order:
intake → risk → policy → memo.
3. Each agent returns JSON with a `step` key.
4. The memo agent returns the final response.
![Streamlit UI](images/ui-demo.png)
### PII Redaction in Action
The security filter automatically redacts sensitive information (CNIC, email, phone) before it reaches the agent:
![PII Redaction](images/pii-redaction.png)
### Prompt Injection Detection
The filter detects and flags malicious prompt injection attempts:
![Prompt Injection Detection](images/prompt-injection.png)
All model calls go through Planos LLM gateway, and guardrails run before any agent sees input.
---
## Notes on design choices
## Endpoints
- The PII filter is HTTP-based rather than MCP to keep the demo simpler to debug.
- Agents execute sequentially to make traces readable.
- Model aliases are supported by Plano, but the agent uses explicit model IDs to avoid ambiguity during the demo.
- Error handling favors fallback responses over hard failures.
Risk Crew Agent (10530):
- `POST /v1/agents/intake/chat/completions`
- `POST /v1/agents/risk/chat/completions`
- `POST /v1/agents/policy/chat/completions`
- `POST /v1/agents/memo/chat/completions`
- `GET /health`
These are demo choices, not hard requirements.
PII Filter (10550):
- `POST /v1/tools/pii_security_filter`
- `GET /health`
Plano (8001):
- `POST /v1/chat/completions`
---
## What this demo demonstrates
## UI flow
- A real multi-agent workflow running behind a control plane
- Centralized security and routing
- Clear separation between agents and infrastructure
- End-to-end observability
- OpenAI-compatible APIs preserved throughout
1. Paste or select an application JSON.
2. Click **Assess Risk**.
3. Review the decision memo.
This is closer to how agentic systems are likely to be deployed in practice.
## Using the Demo
### Streamlit UI Workflow
1. **Select a Scenario** (or paste your own JSON):
- 🟢 **Scenario A** - Low risk (stable job, good credit, low DTI)
- 🟡 **Scenario B** - Medium risk (thin file, missing verifications)
- 🔴 **Scenario C** - High risk + prompt injection attempt
2. **Click "Assess Risk"** - The UI calls the four agents sequentially through Plano
3. **Review Results** - Memo + key summary fields, with normalized data in an expander
### Direct API Testing
You can also send requests directly to Plano:
```bash
curl http://localhost:8001/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": "Assess credit risk for this application: {\"applicant_name\": \"Sarah Ahmed\", \"loan_amount\": 300000, \"credit_score\": 780, \"monthly_income\": 200000, \"total_debt\": 25000, \"delinquencies\": 0, \"kyc_complete\": true, \"income_verified\": true}"
}
]
}'
```
## Example Scenarios
### Scenario A: Low Risk
- Applicant: Sarah Ahmed
- Credit Score: 780 (Excellent)
- DTI: 12.5% (Low)
- Delinquencies: 0
- KYC: Complete
- **Expected**: LOW risk, APPROVE recommendation
### Scenario B: Medium Risk
- Applicant: Hassan Khan
- Credit Score: 620 (Fair)
- DTI: 50% (Elevated)
- Delinquencies: 1
- KYC: Incomplete (missing income/address verification)
- **Expected**: MEDIUM risk, CONDITIONAL_APPROVE or REFER
### Scenario C: High Risk + Injection
- Applicant: Ali Raza
- Credit Score: 520 (Poor)
- DTI: 100% (Critical)
- Delinquencies: 3
- Contains: "Ignore all previous instructions" (prompt injection)
- **Expected**: HIGH risk, REJECT, PII redacted, injection detected
## Service Details
### Risk Crew Agent (Port 10530) - CrewAI Multi-Agent System
Implements four standalone endpoints where each agent is specialized:
1. **Intake & Normalization Agent**
- Model: `risk_fast` (gpt-4o-mini)
- Task: Extract application data, normalize fields, calculate DTI, flag missing data
- Output: Clean structured dataset with validation results
2. **Risk Scoring & Driver Analysis Agent**
- Model: `risk_reasoning` (gpt-4o)
- Task: Analyze credit score, DTI, delinquencies, utilization
- Output: Risk band (LOW/MEDIUM/HIGH) with confidence + top 3 risk drivers with evidence
3. **Policy & Compliance Agent**
- Model: `risk_reasoning` (gpt-4o)
- Task: Verify KYC completion, income/address verification, check policy violations
- Output: Policy checks status + exceptions + required documents list
4. **Decision Memo & Action Agent**
- Model: `risk_reasoning` (gpt-4o)
- Task: Synthesize findings into bank-ready memo
- Output: Executive summary + recommendation (APPROVE/CONDITIONAL_APPROVE/REFER/REJECT)
**Context Passing:** Each call includes the prior outputs as explicit JSON payloads:
- Intake expects the raw application JSON.
- Risk expects `{ application, intake }`.
- Policy expects `{ application, intake, risk }`.
- Memo expects `{ application, intake, risk, policy }`.
### PII Security Filter (Port 10550)
HTTP Filter that:
- Redacts CNIC patterns (12345-6789012-3)
- Redacts phone numbers (+923001234567)
- Redacts email addresses
- Detects prompt injections ("ignore policy", "bypass checks", etc.)
- Adds security warnings to flagged content
## Configuration Files
### config.yaml (Plano Configuration)
- **Agents**: `loan_intake_agent`, `risk_scoring_agent`, `policy_compliance_agent`, `decision_memo_agent`
- **Filters**: `pii_security_filter` in filter chain
- **Model Providers**: OpenAI GPT-4o and GPT-4o-mini
- **Model Aliases**: `risk_fast` (mini), `risk_reasoning` (4o)
- **Listeners**: agent (8001), model (12000)
- **Tracing**: 100% sampling to Jaeger
### docker-compose.yaml
Orchestrates 4 services:
- `risk-crew-agent` - Risk assessment engine
- `pii-filter` - Security filter
- `streamlit-ui` - Web interface
- `jaeger` - Tracing backend
## Observability
### Jaeger Tracing
View distributed traces at http://localhost:16686
**CrewAI Multi-Agent Trace Flow:**
```
chat_completions (risk-crew-agent) - 8500ms
├─ crewai_risk_assessment_workflow - 8200ms
│ ├─ POST /v1/chat/completions (risk_fast) - 800ms
│ │ └─ openai.chat.completions.create (gpt-4o-mini) - 750ms
│ ├─ POST /v1/chat/completions (risk_reasoning) - 2100ms
│ │ └─ openai.chat.completions.create (gpt-4o) - 2000ms
│ ├─ POST /v1/chat/completions (risk_reasoning) - 1800ms
│ │ └─ openai.chat.completions.create (gpt-4o) - 1750ms
│ └─ POST /v1/chat/completions (risk_reasoning) - 2400ms
│ └─ openai.chat.completions.create (gpt-4o) - 2350ms
```
**Complete Request Flow:**
1. UI sends request to Plano orchestrator (8001)
2. Plano applies PII security filter (10550)
3. Plano routes to Risk Crew Agent (10530)
4. CrewAI executes 4 agents sequentially:
- Each agent calls Plano LLM Gateway (12000)
- Plano routes to OpenAI with configured model alias
5. Agent returns synthesized assessment
6. All spans visible in Jaeger (16686)
**Search Tips:**
- Service: `risk-crew-agent`
- Operation: `chat_completions` or `crewai_risk_assessment_workflow`
- Tags: `request_id`, `risk_band`, `recommended_action`, `applicant_name`
- Look for 4-5 LLM call spans per request (indicates CrewAI is working)
## Project Structure
```
credit_risk_case_copilot/
├── config.yaml # Plano orchestrator config
├── docker-compose.yaml # Service orchestration
├── Dockerfile # Multi-purpose container
├── pyproject.toml # Python dependencies
├── .env.example # Environment template
├── README.md # This file
├── test.rest # REST client examples
├── scenarios/ # Test fixtures
│ ├── scenario_a_low_risk.json
│ ├── scenario_b_medium_risk.json
│ └── scenario_c_high_risk_injection.json
└── src/
└── credit_risk_demo/
├── __init__.py
├── risk_crew_agent.py # Multi-agent workflow (FastAPI)
├── pii_filter.py # HTTP security filter (FastAPI)
└── ui_streamlit.py # Web UI (Streamlit)
```
## Development
### Running Services Individually
```bash
# Risk Crew Agent
uv run python src/credit_risk_demo/risk_crew_agent.py
# PII Filter
uv run python src/credit_risk_demo/pii_filter.py
# Streamlit UI
uv run streamlit run src/credit_risk_demo/ui_streamlit.py
```
### Installing Dependencies Locally
```bash
uv sync
# or
pip install -e .
```
---
## Troubleshooting
**Services won't start**
- Check Docker is running: `docker ps`
- Verify ports are available: `lsof -i :8001,10530,10550,8501,16686`
- Check logs: `docker compose logs -f`
**CrewAI Import Errors** (e.g., "No module named 'crewai'")
- Rebuild container with new dependencies:
```bash
docker compose build risk-crew-agent --no-cache
docker compose up risk-crew-agent
```
**Slow Response Times (>20 seconds)**
- **Expected:** 8-15 seconds is normal for CrewAI (4 sequential LLM calls)
- **If slower:** Check OpenAI API status, review Jaeger traces for bottlenecks, check Plano logs
**LLM Gateway Connection Failed**
- Verify Plano is running: `curl http://localhost:12000/health`
- Check environment variable: `docker compose exec risk-crew-agent env | grep LLM_GATEWAY`
- Should show: `LLM_GATEWAY_ENDPOINT=http://host.docker.internal:12000/v1`
**Plano won't start**
- Verify installation: `planoai --version`
- Check config: `planoai validate config.yaml`
- Ensure OPENAI_API_KEY is set
**No response from agents**
- Verify all services are healthy:
- `curl http://localhost:10530/health` (should show `"framework": "CrewAI"`)
- `curl http://localhost:10550/health`
- Check Plano is running on port 8001
**Streamlit can't connect**
- Verify PLANO_ENDPOINT in docker-compose matches Plano port
- Check `host.docker.internal` resolves (should point to host machine)
**Jaeger shows no traces**
- Verify OTLP_ENDPOINT in services points to Jaeger
- Check Jaeger is running: `docker compose ps jaeger`
- Allow a few seconds for traces to appear
- **CrewAI traces:** Look for `crewai_risk_assessment_workflow` span with 4 child LLM calls
**CrewAI Output Parsing Errors**
- Check logs: `docker compose logs risk-crew-agent | grep "Error parsing"`
- System falls back to basic response if parsing fails (check for "REFER" recommendation)
## API Endpoints
### Plano Orchestrator (8001)
- `POST /v1/chat/completions` - Main entry point (OpenAI-compatible)
### Risk Crew Agent (10530)
- `POST /v1/agents/intake/chat/completions` - Intake normalization endpoint
- `POST /v1/agents/risk/chat/completions` - Risk scoring endpoint
- `POST /v1/agents/policy/chat/completions` - Policy compliance endpoint
- `POST /v1/agents/memo/chat/completions` - Decision memo endpoint
- `POST /v1/chat/completions` - Full risk assessment endpoint (legacy)
- `GET /health` - Health check
### PII Filter (10550)
- `POST /v1/tools/pii_security_filter` - PII filter endpoint
- `GET /health` - Health check
## Next Steps & Extensions
### Immediate Enhancements
- Add database persistence for assessment storage (PostgreSQL/MongoDB)
- Implement parallel agent execution where possible (e.g., Risk + Policy agents)
- Add agent tools (credit bureau API integration, fraud detection)
- Enable CrewAI memory for cross-request learning
### Production Readiness
- Implement rate limiting and request throttling
- Add caching layer for repeated assessments
- Set up monitoring/alerting (Prometheus + Grafana)
- Implement user authentication and RBAC
- Add audit log persistence
### Feature Extensions
- Add Fraud Detection Agent to the crew
- Implement Appeals Agent for rejected applications
- Build analytics dashboard for risk metrics
- Add email/SMS notifications for decisions
- Implement batch processing API for multiple applications
- Create PDF export for decision memos
- Add A/B testing framework for different risk models
## What This Demo Demonstrates
This project showcases:
**True Multi-Agent AI System** - 4 specialized CrewAI agents with distinct roles and expertise
**Plano Orchestration** - Central LLM gateway managing all agent calls without config changes
**Model Aliases** - Semantic routing (`risk_fast`, `risk_reasoning`) for cost/quality optimization
**Security Guardrails** - PII redaction and prompt injection detection via HTTP filters
**Full Observability** - OpenTelemetry traces showing every agent execution in Jaeger
**Production Patterns** - Error handling, fallbacks, health checks, structured logging
**Context Passing** - Agents build on each other's work through sequential task dependencies
**Backward Compatibility** - OpenAI-compatible API maintained throughout
### Key Metrics
- **4 LLM calls** per risk assessment (1x gpt-4o-mini + 3x gpt-4o)
- **8-15 second** response time (sequential agent execution)
- **~$0.02-0.05** cost per request
- **Zero config changes** to Plano (everything already supported!)
- **100% trace visibility** across all services
### Documentation
- **This README** - Quick start and API reference
- **CREWAI_INTEGRATION.md** - Deep dive into CrewAI implementation (500+ lines)
- **CREWAI_CHECKLIST.md** - Testing and verification guide
- **IMPLEMENTATION_SUMMARY.md** - What changed and why
## License
This is a demo project for educational purposes.
## Support
For issues with Plano, see: https://docs.planoai.dev
---
**Last Updated:** January 2026
**Version:** 0.2.0 - CrewAI Multi-Agent Integration
**Status:** Production-ready demo with full CrewAI implementation
- **No response**: confirm Plano is running and ports are free (`8001`, `10530`, `10550`, `8501`).
- **LLM gateway errors**: check `LLM_GATEWAY_ENDPOINT=http://host.docker.internal:12000/v1`.
- **No traces**: check Jaeger and `OTLP_ENDPOINT`.

View file

@ -49,23 +49,26 @@ listeners:
agents:
- id: loan_intake_agent
description: |
Loan Intake Agent - Extracts and normalizes loan application data for downstream analysis.
Loan Intake Agent - Step 1 of 4 in the credit risk pipeline. Run first.
CAPABILITIES:
* Normalize applicant data and calculate derived fields (e.g., DTI)
* Identify missing or inconsistent fields
* Produce structured intake JSON for other agents
* Produce structured intake JSON for downstream agents
USE CASES:
* "Normalize this loan application"
* "Extract and validate applicant data"
When requests focus on intake or data normalization for loan applications, route here.
OUTPUT REQUIREMENTS:
* Return JSON with step="intake" and normalized_data/missing_fields
* Do not provide the final decision memo
* This output is used by risk_scoring_agent next
filter_chain:
- pii_security_filter
- id: risk_scoring_agent
description: |
Risk Scoring Agent - Produces risk band, confidence score, and top drivers.
Risk Scoring Agent - Step 2 of 4. Run after intake.
CAPABILITIES:
* Evaluate credit score, DTI, delinquencies, utilization
@ -76,12 +79,15 @@ listeners:
* "Score the risk for this applicant"
* "Provide risk band and drivers"
When requests focus on risk scoring or drivers, route here.
OUTPUT REQUIREMENTS:
* Use intake output from prior assistant message
* Return JSON with step="risk" and risk_band/confidence_score/top_3_risk_drivers
* This output is used by policy_compliance_agent next
filter_chain:
- pii_security_filter
- id: policy_compliance_agent
description: |
Policy Compliance Agent - Checks lending policy rules and required documents.
Policy Compliance Agent - Step 3 of 4. Run after risk scoring.
CAPABILITIES:
* Verify KYC, income, and address checks
@ -92,12 +98,15 @@ listeners:
* "Check policy compliance"
* "List required documents"
When requests focus on policy checks or exceptions, route here.
OUTPUT REQUIREMENTS:
* Use intake + risk outputs from prior assistant messages
* Return JSON with step="policy" and policy_checks/exceptions/required_documents
* This output is used by decision_memo_agent next
filter_chain:
- pii_security_filter
- id: decision_memo_agent
description: |
Decision Memo Agent - Summarizes findings and recommends actions.
Decision Memo Agent - Step 4 of 4. Final response to the user.
CAPABILITIES:
* Create concise decision memos
@ -107,7 +116,10 @@ listeners:
* "Draft a decision memo"
* "Recommend a credit decision"
When requests focus on final memo or recommendation, route here.
OUTPUT REQUIREMENTS:
* Use intake + risk + policy outputs from prior assistant messages
* Return JSON with step="memo", recommended_action, decision_memo
* Provide the user-facing memo as the final response
filter_chain:
- pii_security_filter

View file

@ -3,7 +3,7 @@ import logging
import os
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
import uvicorn
from crewai import Agent, Crew, Task, Process
@ -13,11 +13,9 @@ from langchain_openai import ChatOpenAI
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.propagate import extract
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from pydantic import BaseModel
# Logging configuration
logging.basicConfig(
@ -62,21 +60,6 @@ llm_reasoning = ChatOpenAI(
)
class RiskAssessmentResult(BaseModel):
request_id: str
normalized_application: Dict[str, Any]
risk_band: str
confidence: float
drivers: List[Dict[str, Any]]
policy_checks: List[Dict[str, str]]
exceptions: List[str]
required_documents: List[str]
recommended_action: str
decision_memo: str
audit_trail: Dict[str, Any]
human_response: str
def build_intake_agent() -> Agent:
"""Build the intake & normalization agent."""
return Agent(
@ -321,480 +304,32 @@ def extract_json_block(output_text: str) -> Optional[Dict[str, Any]]:
return None
def create_risk_crew(application_data: Dict[str, Any]) -> Crew:
"""Create a CrewAI crew for risk assessment with 4 specialized agents."""
intake_agent = build_intake_agent()
risk_scoring_agent = build_risk_agent()
policy_agent = build_policy_agent()
memo_agent = build_memo_agent()
# Task 1: Intake and normalization
intake_task = Task(
description=f"""Analyze this loan application and extract all relevant information:
{json.dumps(application_data, indent=2)}
Extract and normalize:
1. Applicant name and loan amount
2. Monthly income and employment status
3. Credit score and existing loans
4. Total debt and delinquencies
5. Credit utilization rate
6. KYC, income verification, and address verification status
7. Calculate DTI if income is available: (total_debt / monthly_income) * 100
8. Flag any missing critical fields
Output a JSON structure with normalized values and a missing_fields list.""",
agent=intake_agent,
expected_output="Normalized application data with missing field analysis in JSON format",
)
# Task 2: Risk scoring
risk_task = Task(
description="""Based on the normalized data from the Intake Specialist, perform risk assessment:
**Risk Scoring Criteria:**
1. **Credit Score Assessment:**
- Excellent (750): Low risk
- Good (650-749): Medium risk
- Fair (550-649): High risk
- Poor (<550): Critical risk
- Missing: Medium risk (thin file)
2. **Debt-to-Income Ratio:**
- <35%: Low risk
- 35-50%: Medium risk
- >50%: Critical risk
- Missing: High risk
3. **Delinquency History:**
- 0: Low risk
- 1-2: Medium risk
- >2: Critical risk
4. **Credit Utilization:**
- <30%: Low risk
- 30-70%: Medium risk
- >70%: High risk
**Output:**
- Risk band: LOW (excellent profile), MEDIUM (some concerns), HIGH (significant issues)
- Confidence score (0.0-1.0)
- Top 3 risk drivers with: factor name, impact level (CRITICAL/HIGH/MEDIUM/LOW), evidence
Provide your analysis in JSON format.""",
agent=risk_scoring_agent,
expected_output="Risk band classification with confidence score and top 3 drivers in JSON format",
context=[intake_task],
)
# Task 3: Policy checks
policy_task = Task(
description="""Verify policy compliance using the normalized data and risk assessment:
**Policy Checks:**
1. KYC Completion: Check if CNIC, phone, and address are provided
2. Income Verification: Check if income is verified
3. Address Verification: Check if address is verified
4. DTI Limit: Flag if DTI >60% (automatic reject threshold)
5. Credit Score: Flag if <500 (minimum acceptable)
6. Delinquencies: Flag if >2 in recent history
**Required Documents by Risk Band:**
- LOW: Valid CNIC, Credit Report, Employment Letter, Bank Statements (3 months)
- MEDIUM: + Income proof (6 months), Address proof, Tax Returns (2 years)
- HIGH: + Guarantor Documents, Collateral Valuation, Detailed Financials
**Output JSON:**
- policy_checks: [{check, status (PASS/FAIL/WARNING), details}]
- exceptions: [list of exception codes like "KYC_INCOMPLETE", "INCOME_NOT_VERIFIED"]
- required_documents: [list of document names]""",
agent=policy_agent,
expected_output="Policy compliance status with exceptions and required documents in JSON format",
context=[intake_task, risk_task],
)
# Task 4: Decision memo
memo_task = Task(
description="""Generate a bank-ready decision memo synthesizing all findings:
**Memo Structure:**
1. **Executive Summary** (2-3 sentences)
- Loan amount, applicant, risk band, recommendation
2. **Applicant Profile**
- Name, loan amount, credit score, monthly income
3. **Risk Assessment**
- Risk band and confidence
- Top risk drivers with evidence
4. **Policy Compliance**
- Number of checks passed
- Outstanding issues or exceptions
5. **Required Documents**
- List key documents needed
6. **Recommendation**
- APPROVE: LOW risk + all checks passed
- CONDITIONAL_APPROVE: LOW/MEDIUM risk + minor issues (collect docs)
- REFER: MEDIUM/HIGH risk + exceptions (manual review)
- REJECT: HIGH risk OR critical policy violations (>60% DTI, <500 credit score)
7. **Next Steps**
- Action items based on recommendation
Keep professional, concise, and actionable. Max 300 words.
**Also provide:**
- recommended_action: One of APPROVE/CONDITIONAL_APPROVE/REFER/REJECT
- decision_memo: Full memo text""",
agent=memo_agent,
expected_output="Professional decision memo with clear recommendation",
context=[intake_task, risk_task, policy_task],
)
# Create crew with sequential process
crew = Crew(
agents=[intake_agent, risk_scoring_agent, policy_agent, memo_agent],
tasks=[intake_task, risk_task, policy_task, memo_task],
process=Process.sequential,
verbose=True,
)
return crew
def parse_crew_output(crew_output: str, application_data: Dict) -> Dict[str, Any]:
"""Parse CrewAI output and extract structured data."""
# Initialize result structure
result = {
"normalized_application": {},
"risk_band": "MEDIUM",
"confidence": 0.75,
"drivers": [],
"policy_checks": [],
"exceptions": [],
"required_documents": [],
"recommended_action": "REFER",
"decision_memo": "",
}
try:
# CrewAI returns the final task output as a string
output_text = str(crew_output)
# Try to extract JSON blocks from the output
json_blocks = []
lines = output_text.split("\n")
in_json = False
current_json = []
for line in lines:
if "```json" in line or line.strip().startswith("{"):
in_json = True
if not line.strip().startswith("```"):
current_json.append(line)
elif "```" in line and in_json:
in_json = False
if current_json:
try:
json_obj = json.loads("\n".join(current_json))
json_blocks.append(json_obj)
except:
pass
current_json = []
elif in_json:
current_json.append(line)
# Extract from JSON blocks if available
for block in json_blocks:
if "risk_band" in block:
result["risk_band"] = block.get("risk_band", result["risk_band"])
if "confidence" in block:
result["confidence"] = float(
block.get("confidence", result["confidence"])
)
if "drivers" in block:
result["drivers"] = block.get("drivers", result["drivers"])
if "policy_checks" in block:
result["policy_checks"] = block.get(
"policy_checks", result["policy_checks"]
)
if "exceptions" in block:
result["exceptions"] = block.get("exceptions", result["exceptions"])
if "required_documents" in block:
result["required_documents"] = block.get(
"required_documents", result["required_documents"]
)
if "recommended_action" in block:
result["recommended_action"] = block.get(
"recommended_action", result["recommended_action"]
)
# Extract decision memo from text
if (
"**CREDIT RISK DECISION MEMO**" in output_text
or "Executive Summary" in output_text
):
memo_start = output_text.find("**CREDIT RISK DECISION MEMO**")
if memo_start == -1:
memo_start = output_text.find("Executive Summary")
if memo_start != -1:
result["decision_memo"] = output_text[memo_start:].strip()
else:
result["decision_memo"] = output_text
# Normalize application data
result["normalized_application"] = {
"applicant_name": application_data.get("applicant_name", "Unknown"),
"loan_amount": application_data.get("loan_amount", 0),
"monthly_income": application_data.get("monthly_income"),
"credit_score": application_data.get("credit_score"),
"employment_status": application_data.get("employment_status"),
"total_debt": application_data.get("total_debt", 0),
"delinquencies": application_data.get("delinquencies", 0),
"utilization_rate": application_data.get("utilization_rate"),
}
except Exception as e:
logger.error(f"Error parsing crew output: {e}")
# Fall back to basic extraction
result["decision_memo"] = str(crew_output)
return result
async def run_risk_assessment_with_crew(
application_data: Dict[str, Any], request_id: str, trace_context: dict
) -> RiskAssessmentResult:
"""Run CrewAI workflow for risk assessment."""
with tracer.start_as_current_span("crewai_risk_assessment_workflow") as span:
span.set_attribute("request_id", request_id)
span.set_attribute(
"applicant_name", application_data.get("applicant_name", "Unknown")
)
logger.info(f"Starting CrewAI risk assessment for request {request_id}")
try:
# Create and execute crew
crew = create_risk_crew(application_data)
# Run the crew - this will execute all tasks sequentially
crew_result = crew.kickoff()
logger.info(f"CrewAI workflow completed for request {request_id}")
# Parse the crew output
parsed_result = parse_crew_output(crew_result, application_data)
# Build human-friendly response
human_response = f"""**Credit Risk Assessment Complete** (Powered by CrewAI)
**Applicant:** {parsed_result['normalized_application']['applicant_name']}
**Loan Amount:** ${parsed_result['normalized_application']['loan_amount']:,.2f}
**Risk Band:** {parsed_result['risk_band']} (Confidence: {parsed_result['confidence']:.1%})
**Top Risk Drivers:**
{format_drivers(parsed_result['drivers'])}
**Policy Status:** {len(parsed_result['exceptions'])} exception(s) identified
**Required Documents:** {len(parsed_result['required_documents'])} document(s)
**Recommendation:** {parsed_result['recommended_action']}
*Assessment performed by 4-agent CrewAI workflow: Intake Risk Scoring Policy Decision Memo*"""
return RiskAssessmentResult(
request_id=request_id,
normalized_application=parsed_result["normalized_application"],
risk_band=parsed_result["risk_band"],
confidence=parsed_result["confidence"],
drivers=(
parsed_result["drivers"]
if parsed_result["drivers"]
else [
{
"factor": "Analysis in Progress",
"impact": "MEDIUM",
"evidence": "CrewAI assessment completed",
}
]
),
policy_checks=(
parsed_result["policy_checks"]
if parsed_result["policy_checks"]
else [
{
"check": "Comprehensive Review",
"status": "COMPLETED",
"details": "Multi-agent analysis performed",
}
]
),
exceptions=parsed_result["exceptions"],
required_documents=(
parsed_result["required_documents"]
if parsed_result["required_documents"]
else ["Standard loan documentation required"]
),
recommended_action=parsed_result["recommended_action"],
decision_memo=parsed_result["decision_memo"],
audit_trail={
"models_used": [
"risk_fast (gpt-4o-mini)",
"risk_reasoning (gpt-4o)",
],
"agents_executed": [
"intake_agent",
"risk_scoring_agent",
"policy_agent",
"memo_agent",
],
"framework": "CrewAI",
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
},
human_response=human_response,
)
except Exception as e:
logger.error(f"CrewAI workflow error: {e}", exc_info=True)
span.record_exception(e)
# Fallback to basic response
return RiskAssessmentResult(
request_id=request_id,
normalized_application={
"applicant_name": application_data.get("applicant_name", "Unknown"),
"loan_amount": application_data.get("loan_amount", 0),
},
risk_band="MEDIUM",
confidence=0.50,
drivers=[
{"factor": "Assessment Error", "impact": "HIGH", "evidence": str(e)}
],
policy_checks=[
{
"check": "System Check",
"status": "ERROR",
"details": "CrewAI workflow encountered an error",
}
],
exceptions=["SYSTEM_ERROR"],
required_documents=["Manual review required"],
recommended_action="REFER",
decision_memo=f"System encountered an error during assessment. Manual review required. Error: {str(e)}",
audit_trail={
"error": str(e),
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
},
human_response=f"Assessment error occurred. Manual review required. Request ID: {request_id}",
)
def format_drivers(drivers: List[Dict]) -> str:
"""Format drivers for display."""
if not drivers:
return "- Analysis in progress"
lines = []
for driver in drivers:
lines.append(
f"- **{driver.get('factor', 'Unknown')}** ({driver.get('impact', 'UNKNOWN')}): {driver.get('evidence', 'N/A')}"
)
return "\n".join(lines)
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
"""OpenAI-compatible chat completions endpoint powered by CrewAI."""
with tracer.start_as_current_span("chat_completions") as span:
try:
body = await request.json()
messages = body.get("messages", [])
request_id = str(uuid.uuid4())
span.set_attribute("request_id", request_id)
# Extract loan application from last user message
last_user_msg = next(
(m for m in reversed(messages) if m.get("role") == "user"), None
)
if not last_user_msg:
return JSONResponse(
status_code=400, content={"error": "No user message found"}
)
content = last_user_msg.get("content", "")
logger.info(f"Processing CrewAI request {request_id}: {content[:100]}")
# Try to parse JSON from content
application_data = extract_json_from_content(content)
if not application_data:
application_data = {
"applicant_name": "Sample Applicant",
"loan_amount": 100000,
}
# Extract trace context
trace_context = extract(request.headers)
# Run CrewAI risk assessment
result = await run_risk_assessment_with_crew(
application_data, request_id, trace_context
)
# Format response
response_content = result.human_response
# Add machine-readable data as JSON
response_content += (
f"\n\n```json\n{json.dumps(result.dict(), indent=2)}\n```"
)
# Return OpenAI-compatible response
return JSONResponse(
content={
"id": f"chatcmpl-{request_id}",
"object": "chat.completion",
"created": int(datetime.utcnow().timestamp()),
"model": "risk_crew_agent",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": response_content,
},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
"metadata": {
"framework": "CrewAI",
"agents_used": 4,
"request_id": request_id,
},
}
)
except Exception as e:
logger.error(f"Error processing CrewAI request: {e}", exc_info=True)
span.record_exception(e)
return JSONResponse(
status_code=500, content={"error": str(e), "framework": "CrewAI"}
)
def extract_step_outputs(messages: list) -> Dict[str, Dict[str, Any]]:
"""Extract step outputs from assistant messages."""
step_outputs: Dict[str, Dict[str, Any]] = {}
for message in messages:
content = message.get("content", "")
if not content:
continue
json_block = extract_json_block(content)
if isinstance(json_block, dict) and json_block.get("step"):
step_outputs[json_block["step"]] = json_block
return step_outputs
def extract_application_from_messages(messages: list) -> Optional[Dict[str, Any]]:
"""Extract the raw application JSON from the latest user message."""
for message in reversed(messages):
if message.get("role") != "user":
continue
content = message.get("content", "")
json_block = extract_json_from_content(content)
if isinstance(json_block, dict):
if "application" in json_block and isinstance(json_block["application"], dict):
return json_block["application"]
if "step" not in json_block:
return json_block
return None
async def handle_single_agent_step(request: Request, step: str) -> JSONResponse:
@ -808,40 +343,68 @@ async def handle_single_agent_step(request: Request, step: str) -> JSONResponse:
span.set_attribute("request_id", request_id)
span.set_attribute("step", step)
last_user_msg = next(
(m for m in reversed(messages) if m.get("role") == "user"), None
)
if not last_user_msg:
return JSONResponse(
status_code=400, content={"error": "No user message found"}
)
application_data = extract_application_from_messages(messages)
step_outputs = extract_step_outputs(messages)
logger.info(f"Processing {step} request {request_id}")
content = last_user_msg.get("content", "")
logger.info(f"Processing {step} request {request_id}: {content[:100]}")
payload = extract_json_from_content(content)
if not payload:
if step == "intake" and not application_data:
return JSONResponse(
status_code=400,
content={"error": "No JSON payload found in user message"},
content={"error": "No application JSON found in user messages"},
)
if step == "intake":
agent = build_intake_agent()
task = make_intake_task(payload, agent)
task = make_intake_task(application_data, agent)
model_name = "loan_intake_agent"
human_response = "Intake normalization complete."
human_response = "Intake normalization complete. Passing to the next agent."
elif step == "risk":
intake_output = step_outputs.get("intake")
if not intake_output:
return JSONResponse(
status_code=400,
content={"error": "Missing intake output for risk step"},
)
payload = {
"application": application_data or {},
"intake": intake_output,
}
agent = build_risk_agent()
task = make_risk_task(payload, agent)
model_name = "risk_scoring_agent"
human_response = "Risk scoring complete."
human_response = "Risk scoring complete. Passing to the next agent."
elif step == "policy":
intake_output = step_outputs.get("intake")
risk_output = step_outputs.get("risk")
if not intake_output or not risk_output:
return JSONResponse(
status_code=400,
content={"error": "Missing intake or risk output for policy step"},
)
payload = {
"application": application_data or {},
"intake": intake_output,
"risk": risk_output,
}
agent = build_policy_agent()
task = make_policy_task(payload, agent)
model_name = "policy_compliance_agent"
human_response = "Policy compliance review complete."
human_response = "Policy compliance review complete. Passing to the next agent."
elif step == "memo":
intake_output = step_outputs.get("intake")
risk_output = step_outputs.get("risk")
policy_output = step_outputs.get("policy")
if not intake_output or not risk_output or not policy_output:
return JSONResponse(
status_code=400,
content={"error": "Missing prior outputs for memo step"},
)
payload = {
"application": application_data or {},
"intake": intake_output,
"risk": risk_output,
"policy": policy_output,
}
agent = build_memo_agent()
task = make_memo_task(payload, agent)
model_name = "decision_memo_agent"
@ -854,6 +417,11 @@ async def handle_single_agent_step(request: Request, step: str) -> JSONResponse:
crew_output = run_single_step(agent, task)
json_payload = extract_json_block(str(crew_output)) or {"step": step}
if step == "memo":
decision_memo = json_payload.get("decision_memo")
if decision_memo:
human_response = decision_memo
response_content = (
f"{human_response}\n\n```json\n{json.dumps(json_payload, indent=2)}\n```"
)

View file

@ -41,8 +41,8 @@ def extract_json_block(content: str):
return None
def call_plano(step_label: str, payload: dict):
"""Call Plano and return the parsed JSON response."""
def call_plano(application_data: dict):
"""Call Plano once and return the assistant content and parsed JSON block."""
response = httpx.post(
f"{PLANO_ENDPOINT}/chat/completions",
json={
@ -51,16 +51,17 @@ def call_plano(step_label: str, payload: dict):
{
"role": "user",
"content": (
f"Run the {step_label} step only. Return JSON.\n\n"
f"{json.dumps(payload, indent=2)}"
"Run the full credit risk pipeline: intake -> risk -> policy -> memo. "
"Return the final decision memo for the applicant and include JSON.\n\n"
f"{json.dumps(application_data, indent=2)}"
),
}
],
},
timeout=60.0,
timeout=90.0,
)
if response.status_code != 200:
return None, {
return None, None, {
"status_code": response.status_code,
"text": response.text,
}
@ -68,14 +69,16 @@ def call_plano(step_label: str, payload: dict):
raw = response.json()
content = raw["choices"][0]["message"]["content"]
parsed = extract_json_block(content)
return parsed, raw
return content, parsed, raw
# Initialize session state
if "workflow_result" not in st.session_state:
st.session_state.workflow_result = None
if "raw_results" not in st.session_state:
st.session_state.raw_results = {}
if "assistant_content" not in st.session_state:
st.session_state.assistant_content = None
if "parsed_result" not in st.session_state:
st.session_state.parsed_result = None
if "raw_response" not in st.session_state:
st.session_state.raw_response = None
if "application_json" not in st.session_state:
st.session_state.application_json = "{}"
@ -124,77 +127,19 @@ with st.sidebar:
if st.button("🔍 Assess Risk", type="primary", use_container_width=True):
try:
application_data = json.loads(application_json)
with st.spinner("Running credit risk assessment..."):
content, parsed, raw = call_plano(application_data)
with st.spinner("Running intake..."):
intake, intake_raw = call_plano("loan intake normalization", application_data)
if not intake:
st.session_state.workflow_result = None
st.session_state.raw_results = {"intake": intake_raw}
st.error("Intake step failed.")
st.stop()
with st.spinner("Running risk scoring..."):
risk_payload = {"application": application_data, "intake": intake}
risk, risk_raw = call_plano("risk scoring", risk_payload)
if not risk:
st.session_state.workflow_result = None
st.session_state.raw_results = {
"intake": intake_raw,
"risk": risk_raw,
}
st.error("Risk scoring step failed.")
st.stop()
with st.spinner("Running policy compliance..."):
policy_payload = {
"application": application_data,
"intake": intake,
"risk": risk,
}
policy, policy_raw = call_plano("policy compliance", policy_payload)
if not policy:
st.session_state.workflow_result = None
st.session_state.raw_results = {
"intake": intake_raw,
"risk": risk_raw,
"policy": policy_raw,
}
st.error("Policy compliance step failed.")
st.stop()
with st.spinner("Running decision memo..."):
memo_payload = {
"application": application_data,
"intake": intake,
"risk": risk,
"policy": policy,
}
memo, memo_raw = call_plano("decision memo", memo_payload)
if not memo:
st.session_state.workflow_result = None
st.session_state.raw_results = {
"intake": intake_raw,
"risk": risk_raw,
"policy": policy_raw,
"memo": memo_raw,
}
st.error("Decision memo step failed.")
st.stop()
st.session_state.workflow_result = {
"application": application_data,
"intake": intake,
"risk": risk,
"policy": policy,
"memo": memo,
}
st.session_state.raw_results = {
"intake": intake_raw,
"risk": risk_raw,
"policy": policy_raw,
"memo": memo_raw,
}
st.success("✅ Risk assessment complete!")
if content is None:
st.session_state.assistant_content = None
st.session_state.parsed_result = None
st.session_state.raw_response = raw
st.error("Request failed. See raw response for details.")
else:
st.session_state.assistant_content = content
st.session_state.parsed_result = parsed
st.session_state.raw_response = raw
st.success("✅ Risk assessment complete!")
except json.JSONDecodeError:
st.error("Invalid JSON format")
@ -203,53 +148,41 @@ with st.sidebar:
with col_b:
if st.button("🧹 Clear", use_container_width=True):
st.session_state.workflow_result = None
st.session_state.raw_results = {}
st.session_state.assistant_content = None
st.session_state.parsed_result = None
st.session_state.raw_response = None
st.session_state.application_json = "{}"
st.rerun()
# Main content area
if st.session_state.workflow_result:
result = st.session_state.workflow_result
if st.session_state.assistant_content or st.session_state.parsed_result:
parsed = st.session_state.parsed_result or {}
st.header("Decision")
col1, col2, col3 = st.columns(3)
col1, col2 = st.columns(2)
with col1:
risk_color = {"LOW": "🟢", "MEDIUM": "🟡", "HIGH": "🔴"}
risk_band = result.get("risk", {}).get("risk_band", "UNKNOWN")
st.metric("Risk Band", f"{risk_color.get(risk_band, '')} {risk_band}")
with col2:
confidence = result.get("risk", {}).get("confidence_score", 0.0)
try:
st.metric("Confidence", f"{float(confidence):.0%}")
except Exception:
st.metric("Confidence", str(confidence))
with col3:
st.metric(
"Recommended Action",
result.get("memo", {}).get("recommended_action", "REVIEW"),
parsed.get("recommended_action", "REVIEW"),
)
with col2:
st.metric("Step", parsed.get("step", "memo"))
st.divider()
st.subheader("Decision Memo")
memo = result.get("memo", {}).get("decision_memo", "")
memo = parsed.get("decision_memo") or st.session_state.assistant_content
if memo:
st.markdown(memo)
else:
st.info("No decision memo available.")
st.divider()
with st.expander("Normalized Application"):
st.json(result.get("intake", {}).get("normalized_data", {}))
with st.expander("Step Outputs (debug)"):
st.json(st.session_state.raw_results or {})
with st.expander("Raw Response"):
st.json(st.session_state.raw_response or {})
else:
st.info(