This commit is contained in:
Alpha Nerd 2025-12-17 16:24:28 +01:00 committed by GitHub
parent 5c27f51941
commit 91ad5afe81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 276 additions and 41 deletions

86
SECURITY.md Normal file
View file

@ -0,0 +1,86 @@
# Security Policy
## Overview
The NOMYO Client implements end-to-end encryption for secure communication with the NOMYO Router.
## Security Best Practices
### 1. Always Use HTTPS in Production
The client MUST connect using HTTPS in production environments:
```python
# ✅ SECURE (Production)
client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434")
# ⚠️ INSECURE (Local development only)
client = SecureChatCompletion(base_url="http://localhost:12434", allow_http=True)
```
**Important:** The `allow_http=True` parameter must be explicitly set to enable HTTP connections for local development. Without this parameter, HTTP connections will raise a `SecurityError` to prevent accidental use of insecure connections.
HTTP connections are vulnerable to man-in-the-middle attacks where an attacker could intercept and substitute the server's public key.
### 2. Protect Private Keys
**Password Protection:**
```python
# Generate keys with password protection
await client.generate_keys(save_to_file=True, password="strong_password_here")
# Load password-protected keys
await client.load_keys("client_keys/private_key.pem", password="strong_password_here")
```
**File Permissions:**
- Private keys are automatically saved with 0600 permissions (owner read/write only)
- Never commit private keys to version control
- Add `client_keys/` to your `.gitignore`
### 3. Key Management
**Key Rotation:**
- Regularly rotate RSA key pairs (recommended: every 90 days)
- Generate new keys when changing environments (dev → staging → production)
**Key Storage:**
- Store keys outside the project directory in production
- Use environment variables or secrets management systems
- Never hardcode keys in source code
### 4. Validate Server Certificates
The client enforces HTTPS certificate verification by default. Do not disable this in production.
## Security Features
- RSA-4096 with OAEP padding (SHA-256)
- AES-256-GCM for payload encryption
- Cryptographically secure random number generation (using `secrets` module)
- HTTPS with certificate verification
- Input validation and size limits (10MB max payload)
- Secure error handling (no information leakage)
- Key validation (minimum 2048-bit RSA keys)
## Local Development
For local development with HTTP servers:
```python
# Explicitly allow HTTP for local development
client = SecureChatCompletion(
base_url="http://localhost:12434",
allow_http=True # Required for HTTP
)
```
This will display warnings but allow the connection to proceed.
## Reporting Security Issues
Report security vulnerabilities responsibly:
- Do NOT create public GitHub issues
- Contact: security@nomyo.ai
- Include detailed vulnerability information
- Allow time for remediation before disclosure

View file

@ -1,4 +1,4 @@
import json, base64, urllib.parse, httpx, os
import json, base64, urllib.parse, httpx, os, secrets, warnings
from typing import Dict, Any, Optional
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
@ -7,6 +7,11 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
class SecurityError(Exception):
"""Raised when a security violation is detected."""
pass
class SecureCompletionClient:
"""
Client for the /v1/chat/secure_completion endpoint.
@ -18,17 +23,32 @@ class SecureCompletionClient:
- Response parsing
"""
def __init__(self, router_url: str = "http://api.nomyo.ai:12434"):
def __init__(self, router_url: str = "https://api.nomyo.ai:12434", allow_http: bool = False):
"""
Initialize the secure completion client.
Args:
router_url: Base URL of the NOMYO Router (e.g., "http://api.nomyo.ai:12434")
router_url: Base URL of the NOMYO Router (must use HTTPS for production)
allow_http: Allow HTTP connections (ONLY for local development, never in production)
"""
self.router_url = router_url.rstrip('/')
self.private_key = None
self.public_key_pem = None
self.key_size = 4096 # RSA key size
self.allow_http = allow_http # Store for use in fetch_server_public_key
# Validate HTTPS for security
if not self.router_url.startswith("https://"):
if not allow_http:
warnings.warn(
"⚠️ WARNING: Using HTTP instead of HTTPS. "
"This is INSECURE and should only be used for local development. "
"Man-in-the-middle attacks are possible!",
UserWarning,
stacklevel=2
)
else:
print("⚠️ HTTP mode enabled for local development (INSECURE)")
async def generate_keys(self, save_to_file: bool = False, key_dir: str = "client_keys", password: Optional[str] = None) -> None:
"""
@ -152,28 +172,79 @@ class SecureCompletionClient:
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
# Validate loaded key
self._validate_rsa_key(self.private_key, "private")
print(" ✓ Keys loaded successfully")
async def fetch_server_public_key(self) -> str:
"""
Fetch the server's public key from the /pki/public_key endpoint.
Uses HTTPS with certificate verification to prevent MITM attacks.
HTTP is only allowed if explicitly enabled via allow_http parameter.
Returns:
Server's public key as PEM string
Raises:
SecurityError: If HTTPS is not used and HTTP is not explicitly allowed
ConnectionError: If connection fails
ValueError: If response is invalid
"""
print("🔑 Fetching server's public key...")
# Security check: Ensure HTTPS is used unless HTTP explicitly allowed
if not self.router_url.startswith("https://"):
if not self.allow_http:
raise SecurityError(
"Server public key must be fetched over HTTPS to prevent MITM attacks. "
"For local development, initialize with allow_http=True: "
"SecureChatCompletion(base_url='http://localhost:12434', allow_http=True)"
)
else:
print(" ⚠️ Fetching key over HTTP (local development mode)")
url = f"{self.router_url}/pki/public_key"
try:
async with httpx.AsyncClient(timeout=60.0) as client:
# Use HTTPS verification only for HTTPS URLs
verify_ssl = self.router_url.startswith("https://")
async with httpx.AsyncClient(
timeout=60.0,
verify=verify_ssl, # Verify SSL/TLS certificates for HTTPS
) as client:
response = await client.get(url)
if response.status_code == 200:
server_public_key = response.text
print(" ✓ Server's public key fetched successfully")
# Validate it's a valid PEM key
try:
serialization.load_pem_public_key(
server_public_key.encode('utf-8'),
backend=default_backend()
)
except Exception:
raise ValueError("Server returned invalid public key format")
if verify_ssl:
print(" ✓ Server's public key fetched securely over HTTPS")
else:
print(" ⚠️ Server's public key fetched over HTTP (INSECURE)")
return server_public_key
else:
raise ValueError(f"Failed to fetch server's public key: HTTP {response.status_code}")
except httpx.ConnectError as e:
raise ConnectionError(f"Failed to connect to server: {e}")
except httpx.TimeoutException:
raise ConnectionError("Connection to server timed out")
except SecurityError:
raise # Re-raise security errors
except ValueError:
raise # Re-raise validation errors
except Exception as e:
raise ValueError(f"Failed to fetch server's public key: {e}")
@ -188,20 +259,34 @@ class SecureCompletionClient:
Encrypted payload as bytes
Raises:
Exception: If encryption fails
ValueError: If payload is invalid or too large
SecurityError: If encryption fails
"""
print("🔒 Encrypting payload...")
# Validate payload
if not isinstance(payload, dict):
raise ValueError("Payload must be a dictionary")
if not payload:
raise ValueError("Payload cannot be empty")
try:
# Serialize payload to JSON
payload_json = json.dumps(payload).encode('utf-8')
# Validate payload size (prevent DoS)
MAX_PAYLOAD_SIZE = 10 * 1024 * 1024 # 10MB limit
if len(payload_json) > MAX_PAYLOAD_SIZE:
raise ValueError(f"Payload too large: {len(payload_json)} bytes (max: {MAX_PAYLOAD_SIZE})")
print(f" Payload size: {len(payload_json)} bytes")
# Generate random AES key
aes_key = os.urandom(32) # 256-bit key
# Generate cryptographically secure random AES key
aes_key = secrets.token_bytes(32) # 256-bit key
# Encrypt payload with AES-GCM using Cipher API (matching server implementation)
nonce = os.urandom(12) # 96-bit nonce for GCM
nonce = secrets.token_bytes(12) # 96-bit nonce for GCM
cipher = Cipher(
algorithms.AES(aes_key),
modes.GCM(nonce),
@ -248,8 +333,13 @@ class SecureCompletionClient:
return package_json
except ValueError:
raise # Re-raise validation errors
except SecurityError:
raise # Re-raise security errors
except Exception as e:
raise Exception(f"Encryption failed: {str(e)}")
# Don't leak internal details
raise SecurityError("Encryption operation failed")
async def decrypt_response(self, encrypted_response: bytes, payload_id: str) -> Dict[str, Any]:
"""
@ -261,47 +351,74 @@ class SecureCompletionClient:
Returns:
Decrypted response dictionary
Raises:
ValueError: If response format is invalid
SecurityError: If decryption fails or integrity check fails
"""
print("🔓 Decrypting response...")
# Validate input
if not encrypted_response:
raise ValueError("Empty encrypted response")
if not isinstance(encrypted_response, bytes):
raise ValueError("Encrypted response must be bytes")
# Parse encrypted package
try:
package = json.loads(encrypted_response.decode('utf-8'))
except json.JSONDecodeError as e:
raise ValueError(f"Invalid encrypted package format: {e}")
except json.JSONDecodeError:
raise ValueError("Invalid encrypted package format: malformed JSON")
except UnicodeDecodeError:
raise ValueError("Invalid encrypted package format: not valid UTF-8")
# Validate package structure
required_fields = ["version", "algorithm", "encrypted_payload", "encrypted_aes_key"]
for field in required_fields:
if field not in package:
raise ValueError(f"Missing required field in encrypted package: {field}")
missing_fields = [f for f in required_fields if f not in package]
if missing_fields:
raise ValueError(f"Missing required fields in encrypted package: {', '.join(missing_fields)}")
# Decrypt AES key with private key
encrypted_aes_key = base64.b64decode(package["encrypted_aes_key"])
aes_key = self.private_key.decrypt(
encrypted_aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
# Validate encrypted_payload structure
if not isinstance(package["encrypted_payload"], dict):
raise ValueError("Invalid encrypted_payload: must be a dictionary")
payload_required = ["ciphertext", "nonce", "tag"]
missing_payload_fields = [f for f in payload_required if f not in package["encrypted_payload"]]
if missing_payload_fields:
raise ValueError(f"Missing fields in encrypted_payload: {', '.join(missing_payload_fields)}")
# Decrypt with proper error handling
try:
# Decrypt AES key with private key
encrypted_aes_key = base64.b64decode(package["encrypted_aes_key"])
aes_key = self.private_key.decrypt(
encrypted_aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
)
# Decrypt payload with AES-GCM using Cipher API (matching server implementation)
ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"])
nonce = base64.b64decode(package["encrypted_payload"]["nonce"])
tag = base64.b64decode(package["encrypted_payload"]["tag"])
# Decrypt payload with AES-GCM using Cipher API (matching server implementation)
ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"])
nonce = base64.b64decode(package["encrypted_payload"]["nonce"])
tag = base64.b64decode(package["encrypted_payload"]["tag"])
cipher = Cipher(
algorithms.AES(aes_key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
cipher = Cipher(
algorithms.AES(aes_key),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# Parse decrypted response
response = json.loads(plaintext.decode('utf-8'))
# Parse decrypted response
response = json.loads(plaintext.decode('utf-8'))
except Exception:
# Don't leak specific decryption errors (timing attacks)
raise SecurityError("Decryption failed: integrity check or authentication failed")
# Add metadata for debugging
if "_metadata" not in response:
@ -378,5 +495,36 @@ class SecureCompletionClient:
except httpx.NetworkError as e:
raise ConnectionError(f"Failed to connect to router: {e}")
except (ValueError, SecurityError, ConnectionError):
raise # Re-raise known exceptions
except Exception as e:
raise Exception(f"Request failed: {e}")
def _validate_rsa_key(self, key, key_type: str = "private") -> None:
"""
Validate that a key is a valid RSA key with appropriate size.
Args:
key: The key to validate
key_type: "private" or "public"
Raises:
ValueError: If key is invalid
"""
if key_type == "private":
if not isinstance(key, rsa.RSAPrivateKey):
raise ValueError("Invalid private key: not an RSA private key")
key_size = key.key_size
else:
if not isinstance(key, rsa.RSAPublicKey):
raise ValueError("Invalid public key: not an RSA public key")
key_size = key.key_size
MIN_KEY_SIZE = 2048
if key_size < MIN_KEY_SIZE:
raise ValueError(
f"Key size {key_size} is too small. "
f"Minimum recommended size is {MIN_KEY_SIZE} bits."
)
print(f" ✓ Valid {key_size}-bit RSA {key_type} key")

View file

@ -37,16 +37,17 @@ class SecureChatCompletion:
```
"""
def __init__(self, base_url: str = "http://api.nomyo.ai:12434"):
def __init__(self, base_url: str = "https://api.nomyo.ai:12434", allow_http: bool = False):
"""
Initialize the secure chat completion client.
Args:
base_url: Base URL of the NOMYO Router (e.g., "http://api.nomyo.ai:12434")
base_url: Base URL of the NOMYO Router (must use HTTPS for production)
This parameter is named 'base_url' for OpenAI compatibility.
allow_http: Allow HTTP connections (ONLY for local development, never in production)
"""
self.client = SecureCompletionClient(router_url=base_url)
self.client = SecureCompletionClient(router_url=base_url, allow_http=allow_http)
self._keys_initialized = False
async def _ensure_keys(self):

View file

@ -9,7 +9,7 @@ the same interface as OpenAI's ChatCompletion.create() method.
import asyncio
from nomyo import SecureChatCompletion
client = SecureChatCompletion(base_url="http://localhost:12434")
client = SecureChatCompletion(base_url="http://localhost:12434", allow_http=True)
async def test_basic_chat():
"""Test basic chat completion with OpenAI-style API."""