- adding security enhancements to meet server side practices
- best effort RSA Key protection for ephemeral keys
- AES in memory protection
This commit is contained in:
Alpha Nerd 2026-01-17 10:59:16 +01:00
parent 197d498ea2
commit 19504d7308
2 changed files with 222 additions and 79 deletions

View file

@ -22,7 +22,7 @@ from nomyo import SecureChatCompletion
async def main():
# Initialize client (defaults to http://api.nomyo.ai:12434)
client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434")
client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434")
# Simple chat completion
response = await client.create(
@ -56,10 +56,19 @@ python3 test.py
### Key Management
- Automatic key generation and management
- Keys stored with restricted permissions (600 for private key)
- Optional password protection for private keys
- Key persistence across sessions
- **Automatic key generation**: Keys are automatically generated on first use
- **Automatic key loading**: Existing keys are loaded automatically from `client_keys/` directory
- **No manual intervention required**: The library handles key management automatically
- **Keys kept in memory**: Active session keys are stored in memory for performance
- **Optional persistence**: Keys can be saved to `client_keys/` directory for reuse across sessions
- **Password protection**: Optional password encryption for private keys (recommended for production)
- **Secure permissions**: Private keys stored with restricted permissions (600 - owner-only access)
- **Secure memory protection**: Plaintext payloads protected from disk swapping and memory lingering### Secure Memory Protection
- **Automatic protection**: Plaintext payloads are automatically protected during encryption
- **Prevents memory swapping**: Sensitive data cannot be swapped to disk
- **Guaranteed zeroing**: Memory is zeroed after encryption completes
- **Fallback mechanism**: Graceful degradation if SecureMemory module unavailable
- **Configurable**: Can be disabled with `secure_memory=False` parameter (not recommended)
## 🔄 OpenAI Compatibility
@ -131,7 +140,7 @@ import asyncio
from nomyo import SecureChatCompletion
async def main():
client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434")
client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434")
response = await client.create(
model="Qwen/Qwen3-0.6B",
@ -154,7 +163,7 @@ import asyncio
from nomyo import SecureChatCompletion
async def main():
client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434")
client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434")
response = await client.create(
model="Qwen/Qwen3-0.6B",
@ -192,7 +201,7 @@ import asyncio
from nomyo import SecureChatCompletion
async def main():
client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434")
client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434")
response = await client.acreate(
model="Qwen/Qwen3-0.6B",
@ -224,14 +233,59 @@ import asyncio
from nomyo import SecureChatCompletion
async def main():
client = SecureChatCompletion(base_url="http://NOMYO-Pro-Router:12434")
client = SecureChatCompletion(base_url="https://NOMYO-Pro-Router:12434")
# ... rest of your code
asyncio.run(main())
```### API Key Authentication
```python
import asyncio
from nomyo import SecureChatCompletion
async def main():
# Initialize with API key (recommended for production)
client = SecureChatCompletion(
base_url="https://api.nomyo.ai:12434",
api_key="your-api-key-here"
)
# Or pass API key in the create() method
response = await client.create(
model="Qwen/Qwen3-0.6B",
messages=[
{"role": "user", "content": "Hello!"}
],
api_key="your-api-key-here" # Overrides instance API key
)
asyncio.run(main())
```
### Secure Memory Configuration
```python
import asyncio
from nomyo import SecureChatCompletion
async def main():
# Enable secure memory protection (default, recommended)
client = SecureChatCompletion(
base_url="https://api.nomyo.ai:12434",
secure_memory=True # Default
)
# Disable secure memory (not recommended, for testing only)
client = SecureChatCompletion(
base_url="https://api.nomyo.ai:12434",
secure_memory=False
)
asyncio.run(main())
```
### Key Management
Keys are automatically generated on first use and stored in `client_keys/` directory.
Keys are automatically generated on first use.
#### Generate Keys Manually
@ -283,9 +337,21 @@ Tests verify:
#### Constructor
```python
SecureChatCompletion(base_url: str = "http://api.nomyo.ai:12434")
SecureChatCompletion(
base_url: str = "https://api.nomyo.ai:12434",
allow_http: bool = False,
api_key: Optional[str] = None,
secure_memory: bool = True
)
```
**Parameters:**
- `base_url`: Base URL of the NOMYO Router (must use HTTPS for production)
- `allow_http`: Allow HTTP connections (ONLY for local development, never in production)
- `api_key`: Optional API key for bearer authentication
- `secure_memory`: Enable secure memory protection (default: True)
#### Methods
- `create(model, messages, **kwargs)`: Create a chat completion

View file

@ -94,6 +94,38 @@ class SecureCompletionClient:
else:
logger.warning("HTTP mode enabled for local development (INSECURE)")
def _protect_private_key(self) -> None:
"""
Attempt to lock private key in memory (best effort).
Note: Due to Python's memory management and the cryptography library's
internal handling of key material, this provides limited protection.
The main benefit is defense-in-depth and signaling security intent.
For maximum security:
- Use password-protected key files
- Rotate keys regularly
- Store keys outside the project directory in production
"""
if not _SECURE_MEMORY_AVAILABLE or not self.private_key:
return
try:
# Attempt to lock the key object in memory
# Note: This is best-effort as the cryptography library
# maintains its own internal key material
import pickle
key_data = pickle.dumps(self.private_key)
from .SecureMemory import _secure_memory
locked = _secure_memory.lock_memory(key_data)
if locked:
logger.debug("Private key locked in memory (best effort)")
else:
logger.debug("Could not lock private key in memory")
except Exception as e:
logger.debug(f"Private key protection unavailable: {e}")
async def generate_keys(self, save_to_file: bool = False, key_dir: str = "client_keys", password: Optional[str] = None) -> None:
"""
Generate RSA key pair for secure communication.
@ -123,6 +155,9 @@ class SecureCompletionClient:
logger.debug("Generated %d-bit RSA key pair", self.key_size)
# Attempt to protect private key in memory (best effort)
self._protect_private_key()
if save_to_file:
os.makedirs(key_dir, exist_ok=True)
@ -219,6 +254,9 @@ class SecureCompletionClient:
# Validate loaded key
self._validate_rsa_key(self.private_key, "private")
# Attempt to protect private key in memory (best effort)
self._protect_private_key()
logger.debug("Keys loaded successfully")
async def fetch_server_public_key(self) -> str:
@ -335,10 +373,13 @@ class SecureCompletionClient:
# Generate cryptographically secure random AES key
aes_key = secrets.token_bytes(32) # 256-bit key
# Encrypt payload with AES-GCM using Cipher API (matching server implementation)
try:
# Protect AES key in memory
with secure_bytes(bytearray(aes_key)) as protected_aes_key:
# Encrypt payload with AES-GCM using Cipher API
nonce = secrets.token_bytes(12) # 96-bit nonce for GCM
cipher = Cipher(
algorithms.AES(aes_key),
algorithms.AES(bytes(protected_aes_key)),
modes.GCM(nonce),
backend=default_backend()
)
@ -355,7 +396,7 @@ class SecureCompletionClient:
backend=default_backend()
)
encrypted_aes_key = server_public_key.encrypt(
aes_key,
bytes(protected_aes_key),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
@ -382,6 +423,9 @@ class SecureCompletionClient:
logger.debug("Encrypted package size: %d bytes", len(package_json))
return package_json
finally:
# Explicitly clear the AES key reference
del aes_key
else:
# Fallback to standard encryption if secure memory not available
logger.warning("Secure memory not available, using standard encryption")
@ -389,6 +433,7 @@ class SecureCompletionClient:
# Generate cryptographically secure random AES key
aes_key = secrets.token_bytes(32) # 256-bit key
try:
# Encrypt payload with AES-GCM using Cipher API (matching server implementation)
nonce = secrets.token_bytes(12) # 96-bit nonce for GCM
cipher = Cipher(
@ -436,6 +481,9 @@ class SecureCompletionClient:
logger.debug("Encrypted package size: %d bytes", len(package_json))
return package_json
finally:
# Explicitly clear the AES key reference
del aes_key
except ValueError:
raise # Re-raise validation errors
@ -505,7 +553,35 @@ class SecureCompletionClient:
)
)
# Decrypt payload with AES-GCM using Cipher API (matching server implementation)
# Use secure memory to protect AES key and decrypted plaintext
if _SECURE_MEMORY_AVAILABLE:
# Protect AES key in memory
with secure_bytes(bytearray(aes_key)) as protected_aes_key:
# Decrypt payload with AES-GCM using Cipher API
ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"])
nonce = base64.b64decode(package["encrypted_payload"]["nonce"])
tag = base64.b64decode(package["encrypted_payload"]["tag"])
cipher = Cipher(
algorithms.AES(bytes(protected_aes_key)),
modes.GCM(nonce, tag),
backend=default_backend()
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
# Protect decrypted plaintext in memory
with secure_bytes(bytearray(plaintext)) as protected_plaintext:
# Parse decrypted response
response = json.loads(bytes(protected_plaintext).decode('utf-8'))
# Plaintext automatically zeroed here
# AES key automatically zeroed here
else:
# Fallback if secure memory not available
logger.warning("Secure memory not available, using standard decryption")
# Decrypt payload with AES-GCM using Cipher API
ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"])
nonce = base64.b64decode(package["encrypted_payload"]["nonce"])
tag = base64.b64decode(package["encrypted_payload"]["tag"])
@ -520,6 +596,7 @@ class SecureCompletionClient:
# Parse decrypted response
response = json.loads(plaintext.decode('utf-8'))
except Exception:
# Don't leak specific decryption errors (timing attacks)
raise SecurityError("Decryption failed: integrity check or authentication failed")