From 19504d73084415bf732bcac5c1a1424a20520a2d Mon Sep 17 00:00:00 2001 From: alpha-nerd-nomyo Date: Sat, 17 Jan 2026 10:59:16 +0100 Subject: [PATCH] feature: - adding security enhancements to meet server side practices - best effort RSA Key protection for ephemeral keys - AES in memory protection --- README.md | 88 +++++++++++-- nomyo/SecureCompletionClient.py | 213 ++++++++++++++++++++++---------- 2 files changed, 222 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 74e7a5e..bb110df 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ from nomyo import SecureChatCompletion async def main(): # Initialize client (defaults to http://api.nomyo.ai:12434) - client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434") + client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434") # Simple chat completion response = await client.create( @@ -56,10 +56,19 @@ python3 test.py ### Key Management -- Automatic key generation and management -- Keys stored with restricted permissions (600 for private key) -- Optional password protection for private keys -- Key persistence across sessions +- **Automatic key generation**: Keys are automatically generated on first use +- **Automatic key loading**: Existing keys are loaded automatically from `client_keys/` directory +- **No manual intervention required**: The library handles key management automatically +- **Keys kept in memory**: Active session keys are stored in memory for performance +- **Optional persistence**: Keys can be saved to `client_keys/` directory for reuse across sessions +- **Password protection**: Optional password encryption for private keys (recommended for production) +- **Secure permissions**: Private keys stored with restricted permissions (600 - owner-only access) +- **Secure memory protection**: Plaintext payloads protected from disk swapping and memory lingering### Secure Memory Protection +- **Automatic protection**: Plaintext payloads are automatically protected during encryption +- **Prevents memory swapping**: Sensitive data cannot be swapped to disk +- **Guaranteed zeroing**: Memory is zeroed after encryption completes +- **Fallback mechanism**: Graceful degradation if SecureMemory module unavailable +- **Configurable**: Can be disabled with `secure_memory=False` parameter (not recommended) ## 🔄 OpenAI Compatibility @@ -131,7 +140,7 @@ import asyncio from nomyo import SecureChatCompletion async def main(): - client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434") + client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434") response = await client.create( model="Qwen/Qwen3-0.6B", @@ -154,7 +163,7 @@ import asyncio from nomyo import SecureChatCompletion async def main(): - client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434") + client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434") response = await client.create( model="Qwen/Qwen3-0.6B", @@ -192,7 +201,7 @@ import asyncio from nomyo import SecureChatCompletion async def main(): - client = SecureChatCompletion(base_url="http://api.nomyo.ai:12434") + client = SecureChatCompletion(base_url="https://api.nomyo.ai:12434") response = await client.acreate( model="Qwen/Qwen3-0.6B", @@ -224,14 +233,59 @@ import asyncio from nomyo import SecureChatCompletion async def main(): - client = SecureChatCompletion(base_url="http://NOMYO-Pro-Router:12434") + client = SecureChatCompletion(base_url="https://NOMYO-Pro-Router:12434") # ... rest of your code asyncio.run(main()) +```### API Key Authentication + +```python +import asyncio +from nomyo import SecureChatCompletion + +async def main(): + # Initialize with API key (recommended for production) + client = SecureChatCompletion( + base_url="https://api.nomyo.ai:12434", + api_key="your-api-key-here" + ) + + # Or pass API key in the create() method + response = await client.create( + model="Qwen/Qwen3-0.6B", + messages=[ + {"role": "user", "content": "Hello!"} + ], + api_key="your-api-key-here" # Overrides instance API key + ) + +asyncio.run(main()) +``` + +### Secure Memory Configuration + +```python +import asyncio +from nomyo import SecureChatCompletion + +async def main(): + # Enable secure memory protection (default, recommended) + client = SecureChatCompletion( + base_url="https://api.nomyo.ai:12434", + secure_memory=True # Default + ) + + # Disable secure memory (not recommended, for testing only) + client = SecureChatCompletion( + base_url="https://api.nomyo.ai:12434", + secure_memory=False + ) + +asyncio.run(main()) ``` ### Key Management -Keys are automatically generated on first use and stored in `client_keys/` directory. +Keys are automatically generated on first use. #### Generate Keys Manually @@ -283,9 +337,21 @@ Tests verify: #### Constructor ```python -SecureChatCompletion(base_url: str = "http://api.nomyo.ai:12434") +SecureChatCompletion( + base_url: str = "https://api.nomyo.ai:12434", + allow_http: bool = False, + api_key: Optional[str] = None, + secure_memory: bool = True +) ``` +**Parameters:** + +- `base_url`: Base URL of the NOMYO Router (must use HTTPS for production) +- `allow_http`: Allow HTTP connections (ONLY for local development, never in production) +- `api_key`: Optional API key for bearer authentication +- `secure_memory`: Enable secure memory protection (default: True) + #### Methods - `create(model, messages, **kwargs)`: Create a chat completion diff --git a/nomyo/SecureCompletionClient.py b/nomyo/SecureCompletionClient.py index d0819b6..8f2be54 100644 --- a/nomyo/SecureCompletionClient.py +++ b/nomyo/SecureCompletionClient.py @@ -94,6 +94,38 @@ class SecureCompletionClient: else: logger.warning("HTTP mode enabled for local development (INSECURE)") + def _protect_private_key(self) -> None: + """ + Attempt to lock private key in memory (best effort). + + Note: Due to Python's memory management and the cryptography library's + internal handling of key material, this provides limited protection. + The main benefit is defense-in-depth and signaling security intent. + + For maximum security: + - Use password-protected key files + - Rotate keys regularly + - Store keys outside the project directory in production + """ + if not _SECURE_MEMORY_AVAILABLE or not self.private_key: + return + + try: + # Attempt to lock the key object in memory + # Note: This is best-effort as the cryptography library + # maintains its own internal key material + import pickle + key_data = pickle.dumps(self.private_key) + from .SecureMemory import _secure_memory + locked = _secure_memory.lock_memory(key_data) + if locked: + logger.debug("Private key locked in memory (best effort)") + else: + logger.debug("Could not lock private key in memory") + except Exception as e: + logger.debug(f"Private key protection unavailable: {e}") + + async def generate_keys(self, save_to_file: bool = False, key_dir: str = "client_keys", password: Optional[str] = None) -> None: """ Generate RSA key pair for secure communication. @@ -123,6 +155,9 @@ class SecureCompletionClient: logger.debug("Generated %d-bit RSA key pair", self.key_size) + # Attempt to protect private key in memory (best effort) + self._protect_private_key() + if save_to_file: os.makedirs(key_dir, exist_ok=True) @@ -219,6 +254,9 @@ class SecureCompletionClient: # Validate loaded key self._validate_rsa_key(self.private_key, "private") + # Attempt to protect private key in memory (best effort) + self._protect_private_key() + logger.debug("Keys loaded successfully") async def fetch_server_public_key(self) -> str: @@ -335,6 +373,67 @@ class SecureCompletionClient: # Generate cryptographically secure random AES key aes_key = secrets.token_bytes(32) # 256-bit key + try: + # Protect AES key in memory + with secure_bytes(bytearray(aes_key)) as protected_aes_key: + # Encrypt payload with AES-GCM using Cipher API + nonce = secrets.token_bytes(12) # 96-bit nonce for GCM + cipher = Cipher( + algorithms.AES(bytes(protected_aes_key)), + modes.GCM(nonce), + backend=default_backend() + ) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(protected_payload) + encryptor.finalize() + tag = encryptor.tag + + # Fetch server's public key for encrypting the AES key + server_public_key_pem = await self.fetch_server_public_key() + + # Encrypt AES key with server's RSA-OAEP + server_public_key = serialization.load_pem_public_key( + server_public_key_pem.encode('utf-8'), + backend=default_backend() + ) + encrypted_aes_key = server_public_key.encrypt( + bytes(protected_aes_key), + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None + ) + ) + + # Create encrypted package + encrypted_package = { + "version": "1.0", + "algorithm": "hybrid-aes256-rsa4096", + "encrypted_payload": { + "ciphertext": base64.b64encode(ciphertext).decode('utf-8'), + "nonce": base64.b64encode(nonce).decode('utf-8'), + "tag": base64.b64encode(tag).decode('utf-8') + }, + "encrypted_aes_key": base64.b64encode(encrypted_aes_key).decode('utf-8'), + "key_algorithm": "RSA-OAEP-SHA256", + "payload_algorithm": "AES-256-GCM" + } + + # Serialize package to JSON and return as bytes + package_json = json.dumps(encrypted_package).encode('utf-8') + logger.debug("Encrypted package size: %d bytes", len(package_json)) + + return package_json + finally: + # Explicitly clear the AES key reference + del aes_key + else: + # Fallback to standard encryption if secure memory not available + logger.warning("Secure memory not available, using standard encryption") + + # Generate cryptographically secure random AES key + aes_key = secrets.token_bytes(32) # 256-bit key + + try: # Encrypt payload with AES-GCM using Cipher API (matching server implementation) nonce = secrets.token_bytes(12) # 96-bit nonce for GCM cipher = Cipher( @@ -343,7 +442,7 @@ class SecureCompletionClient: backend=default_backend() ) encryptor = cipher.encryptor() - ciphertext = encryptor.update(protected_payload) + encryptor.finalize() + ciphertext = encryptor.update(payload_json) + encryptor.finalize() tag = encryptor.tag # Fetch server's public key for encrypting the AES key @@ -382,60 +481,9 @@ class SecureCompletionClient: logger.debug("Encrypted package size: %d bytes", len(package_json)) return package_json - else: - # Fallback to standard encryption if secure memory not available - logger.warning("Secure memory not available, using standard encryption") - - # Generate cryptographically secure random AES key - aes_key = secrets.token_bytes(32) # 256-bit key - - # Encrypt payload with AES-GCM using Cipher API (matching server implementation) - nonce = secrets.token_bytes(12) # 96-bit nonce for GCM - cipher = Cipher( - algorithms.AES(aes_key), - modes.GCM(nonce), - backend=default_backend() - ) - encryptor = cipher.encryptor() - ciphertext = encryptor.update(payload_json) + encryptor.finalize() - tag = encryptor.tag - - # Fetch server's public key for encrypting the AES key - server_public_key_pem = await self.fetch_server_public_key() - - # Encrypt AES key with server's RSA-OAEP - server_public_key = serialization.load_pem_public_key( - server_public_key_pem.encode('utf-8'), - backend=default_backend() - ) - encrypted_aes_key = server_public_key.encrypt( - aes_key, - padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None - ) - ) - - # Create encrypted package - encrypted_package = { - "version": "1.0", - "algorithm": "hybrid-aes256-rsa4096", - "encrypted_payload": { - "ciphertext": base64.b64encode(ciphertext).decode('utf-8'), - "nonce": base64.b64encode(nonce).decode('utf-8'), - "tag": base64.b64encode(tag).decode('utf-8') - }, - "encrypted_aes_key": base64.b64encode(encrypted_aes_key).decode('utf-8'), - "key_algorithm": "RSA-OAEP-SHA256", - "payload_algorithm": "AES-256-GCM" - } - - # Serialize package to JSON and return as bytes - package_json = json.dumps(encrypted_package).encode('utf-8') - logger.debug("Encrypted package size: %d bytes", len(package_json)) - - return package_json + finally: + # Explicitly clear the AES key reference + del aes_key except ValueError: raise # Re-raise validation errors @@ -505,21 +553,50 @@ class SecureCompletionClient: ) ) - # Decrypt payload with AES-GCM using Cipher API (matching server implementation) - ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"]) - nonce = base64.b64decode(package["encrypted_payload"]["nonce"]) - tag = base64.b64decode(package["encrypted_payload"]["tag"]) + # Use secure memory to protect AES key and decrypted plaintext + if _SECURE_MEMORY_AVAILABLE: + # Protect AES key in memory + with secure_bytes(bytearray(aes_key)) as protected_aes_key: + # Decrypt payload with AES-GCM using Cipher API + ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"]) + nonce = base64.b64decode(package["encrypted_payload"]["nonce"]) + tag = base64.b64decode(package["encrypted_payload"]["tag"]) - cipher = Cipher( - algorithms.AES(aes_key), - modes.GCM(nonce, tag), - backend=default_backend() - ) - decryptor = cipher.decryptor() - plaintext = decryptor.update(ciphertext) + decryptor.finalize() + cipher = Cipher( + algorithms.AES(bytes(protected_aes_key)), + modes.GCM(nonce, tag), + backend=default_backend() + ) + decryptor = cipher.decryptor() + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + # Protect decrypted plaintext in memory + with secure_bytes(bytearray(plaintext)) as protected_plaintext: + # Parse decrypted response + response = json.loads(bytes(protected_plaintext).decode('utf-8')) + # Plaintext automatically zeroed here + + # AES key automatically zeroed here + else: + # Fallback if secure memory not available + logger.warning("Secure memory not available, using standard decryption") + + # Decrypt payload with AES-GCM using Cipher API + ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"]) + nonce = base64.b64decode(package["encrypted_payload"]["nonce"]) + tag = base64.b64decode(package["encrypted_payload"]["tag"]) + + cipher = Cipher( + algorithms.AES(aes_key), + modes.GCM(nonce, tag), + backend=default_backend() + ) + decryptor = cipher.decryptor() + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + # Parse decrypted response + response = json.loads(plaintext.decode('utf-8')) - # Parse decrypted response - response = json.loads(plaintext.decode('utf-8')) except Exception: # Don't leak specific decryption errors (timing attacks) raise SecurityError("Decryption failed: integrity check or authentication failed")