diff --git a/README.md b/README.md index 6448f23..1839bdd 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ async def main(): messages=[ {"role": "user", "content": "Hello! How are you today?"} ], + security_tier="standard", #optional: standard, high or maximum temperature=0.7 ) @@ -156,6 +157,7 @@ async def main(): {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "What is the capital of France?"} ], + security_tier="standard", #optional: standard, high or maximum temperature=0.7 ) @@ -167,6 +169,7 @@ asyncio.run(main()) ### With Tools ```python + import asyncio from nomyo import SecureChatCompletion @@ -194,6 +197,7 @@ async def main(): } } ], + security_tier="standard", #optional: standard, high or maximum temperature=0.7 ) diff --git a/nomyo/SecureCompletionClient.py b/nomyo/SecureCompletionClient.py index 27d3863..e157482 100644 --- a/nomyo/SecureCompletionClient.py +++ b/nomyo/SecureCompletionClient.py @@ -11,7 +11,7 @@ logger = logging.getLogger(__name__) # Import secure memory module try: - from .SecureMemory import secure_bytes, get_memory_protection_info + from .SecureMemory import secure_bytearray, get_memory_protection_info, _get_secure_memory _SECURE_MEMORY_AVAILABLE = True except ImportError: _SECURE_MEMORY_AVAILABLE = False @@ -115,9 +115,9 @@ class SecureCompletionClient: # Note: This is best-effort as the cryptography library # maintains its own internal key material import pickle - key_data = pickle.dumps(self.private_key) - from .SecureMemory import _secure_memory - locked = _secure_memory.lock_memory(key_data) + key_data = bytearray(pickle.dumps(self.private_key)) + secure_memory = _get_secure_memory() + locked = secure_memory.lock_memory(key_data) if locked: logger.debug("Private key locked in memory (best effort)") else: @@ -369,22 +369,22 @@ class SecureCompletionClient: # Use secure memory context to protect plaintext payload if _SECURE_MEMORY_AVAILABLE: - with secure_bytes(payload_json) as protected_payload: + with secure_bytearray(payload_json) as protected_payload: # Generate cryptographically secure random AES key aes_key = secrets.token_bytes(32) # 256-bit key try: # Protect AES key in memory - with secure_bytes(bytearray(aes_key)) as protected_aes_key: + with secure_bytearray(aes_key) as protected_aes_key: # Encrypt payload with AES-GCM using Cipher API nonce = secrets.token_bytes(12) # 96-bit nonce for GCM cipher = Cipher( - algorithms.AES(bytes(protected_aes_key)), + algorithms.AES(bytes(protected_aes_key.data)), modes.GCM(nonce), backend=default_backend() ) encryptor = cipher.encryptor() - ciphertext = encryptor.update(protected_payload) + encryptor.finalize() + ciphertext = encryptor.update(bytes(protected_payload.data)) + encryptor.finalize() tag = encryptor.tag # Fetch server's public key for encrypting the AES key @@ -396,7 +396,7 @@ class SecureCompletionClient: backend=default_backend() ) encrypted_aes_key = server_public_key.encrypt( - bytes(protected_aes_key), + bytes(protected_aes_key.data), padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), @@ -556,14 +556,14 @@ class SecureCompletionClient: # Use secure memory to protect AES key and decrypted plaintext if _SECURE_MEMORY_AVAILABLE: # Protect AES key in memory - with secure_bytes(bytearray(aes_key)) as protected_aes_key: + with secure_bytearray(aes_key) as protected_aes_key: # Decrypt payload with AES-GCM using Cipher API ciphertext = base64.b64decode(package["encrypted_payload"]["ciphertext"]) nonce = base64.b64decode(package["encrypted_payload"]["nonce"]) tag = base64.b64decode(package["encrypted_payload"]["tag"]) cipher = Cipher( - algorithms.AES(bytes(protected_aes_key)), + algorithms.AES(bytes(protected_aes_key.data)), modes.GCM(nonce, tag), backend=default_backend() ) @@ -571,9 +571,9 @@ class SecureCompletionClient: plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Protect decrypted plaintext in memory - with secure_bytes(bytearray(plaintext)) as protected_plaintext: + with secure_bytearray(plaintext) as protected_plaintext: # Parse decrypted response - response = json.loads(bytes(protected_plaintext).decode('utf-8')) + response = json.loads(bytes(protected_plaintext.data).decode('utf-8')) # Plaintext automatically zeroed here # AES key automatically zeroed here diff --git a/nomyo/SecureMemory.py b/nomyo/SecureMemory.py index cb625a3..22718ea 100644 --- a/nomyo/SecureMemory.py +++ b/nomyo/SecureMemory.py @@ -7,7 +7,7 @@ usage with Python's memory management characteristics. Supports: - Linux: mlock() + memset() -- Windows: VirtualLock() + RtlSecureZeroMemory() +- Windows: VirtualLock() + RtlZeroMemory() - macOS: mlock() + memset() - Fallback: ctypes-based zeroing for unsupported platforms @@ -16,34 +16,137 @@ Security Features: - Guarantees memory is zeroed before deallocation - Context managers for automatic cleanup - No root privileges required (uses capabilities on Linux) + +IMPORTANT: This module works with mutable bytearray objects for true security. +Python's immutable bytes objects cannot be securely zeroed in place. """ -import os -import sys import ctypes import logging -from typing import Optional, Any +import sys from contextlib import contextmanager from enum import Enum +from typing import Optional, Union # Configure logging logger = logging.getLogger(__name__) + class MemoryProtectionLevel(Enum): """Memory protection levels available""" NONE = "none" # No protection (fallback only) ZEROING_ONLY = "zeroing_only" # Memory zeroing without locking FULL = "full" # Memory locking + zeroing + +class SecureBuffer: + """ + A secure buffer that wraps a bytearray with proper memory protection. + + This class provides: + - Correct memory address calculation using ctypes + - Platform-specific memory locking + - Guaranteed secure zeroing on cleanup + - Context manager support for automatic cleanup + + Usage: + with SecureBuffer(secret_data) as buf: + # Use buf.data (bytearray) for operations + process(buf.data) + # Memory is securely zeroed here + """ + + def __init__(self, data: Union[bytes, bytearray], secure_memory: 'SecureMemory'): + """ + Initialize a secure buffer. + + Args: + data: Initial data (will be copied into a mutable bytearray) + secure_memory: SecureMemory instance for platform operations + """ + self._secure_memory = secure_memory + self._locked = False + self._size = len(data) + + # Create mutable bytearray and ctypes buffer for proper address handling + self._data = bytearray(data) + self._ctypes_buffer = (ctypes.c_char * self._size).from_buffer(self._data) + self._address = ctypes.addressof(self._ctypes_buffer) + + # Zero the original if it was a bytearray (caller's copy) + if isinstance(data, bytearray): + for i in range(len(data)): + data[i] = 0 + + @property + def data(self) -> bytearray: + """Get the underlying bytearray data.""" + return self._data + + @property + def address(self) -> int: + """Get the memory address of the buffer.""" + return self._address + + @property + def size(self) -> int: + """Get the size of the buffer.""" + return self._size + + def lock(self) -> bool: + """Lock the buffer memory to prevent swapping.""" + if self._locked or not self._secure_memory.enabled: + return self._locked + + self._locked = self._secure_memory._lock_memory_at(self._address, self._size) + return self._locked + + def unlock(self) -> bool: + """Unlock the buffer memory.""" + if not self._locked: + return True + + result = self._secure_memory._unlock_memory_at(self._address, self._size) + if result: + self._locked = False + return result + + def zero(self) -> None: + """Securely zero the buffer contents.""" + self._secure_memory._zero_memory_at(self._address, self._size) + # Also zero the Python bytearray for defense in depth + for i in range(len(self._data)): + self._data[i] = 0 + + def __enter__(self) -> 'SecureBuffer': + self.lock() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.zero() + self.unlock() + + def __len__(self) -> int: + return self._size + + def __bytes__(self) -> bytes: + """Convert to bytes (creates a copy - use with caution).""" + return bytes(self._data) + + class SecureMemory: """ Cross-platform secure memory handler for client-side. Automatically detects platform and provides best-available security: - Linux: mlock + memset - - Windows: VirtualLock + RtlSecureZeroMemory + - Windows: VirtualLock + RtlZeroMemory - macOS: mlock + memset - Others: Fallback zeroing + + IMPORTANT: For true security, use SecureBuffer or secure_bytearray() context + manager with bytearray objects. Python's immutable bytes cannot be securely + zeroed in place. """ def __init__(self, enable: bool = True): @@ -59,6 +162,13 @@ class SecureMemory: self.has_mlock = False self.has_secure_zero = False self.protection_level = MemoryProtectionLevel.NONE + self._page_size = 4096 # Default, will be updated per platform + + # Platform-specific function references + self._mlock_func = None + self._munlock_func = None + self._memset_func = None + self._libc = None if self.enabled: self._init_platform_specific() @@ -84,14 +194,28 @@ class SecureMemory: def _init_linux(self): """Initialize Linux-specific functions (mlock + memset)""" try: - self.libc = ctypes.CDLL('libc.so.6') - self.mlock = self.libc.mlock - self.munlock = self.libc.munlock - self.memset = self.libc.memset + # Use use_errno=True for proper errno handling + self._libc = ctypes.CDLL('libc.so.6', use_errno=True) - # Set return types - self.mlock.restype = ctypes.c_int - self.munlock.restype = ctypes.c_int + # Get page size + try: + self._page_size = self._libc.getpagesize() + except Exception: + self._page_size = 4096 + + # Setup mlock/munlock + self._mlock_func = self._libc.mlock + self._mlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._mlock_func.restype = ctypes.c_int + + self._munlock_func = self._libc.munlock + self._munlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._munlock_func.restype = ctypes.c_int + + # Setup memset for secure zeroing + self._memset_func = self._libc.memset + self._memset_func.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t] + self._memset_func.restype = ctypes.c_void_p self.has_mlock = True self.has_secure_zero = True @@ -104,24 +228,69 @@ class SecureMemory: self._init_fallback() def _init_windows(self): - """Initialize Windows-specific functions (VirtualLock + RtlSecureZeroMemory)""" + """Initialize Windows-specific functions (VirtualLock + RtlZeroMemory)""" try: kernel32 = ctypes.windll.kernel32 - # VirtualLock for memory locking - self.virtual_lock = kernel32.VirtualLock - self.virtual_unlock = kernel32.VirtualUnlock - self.virtual_lock.restype = ctypes.c_bool - self.virtual_unlock.restype = ctypes.c_bool + # Get page size + class SYSTEM_INFO(ctypes.Structure): + _fields_ = [ + ("wProcessorArchitecture", ctypes.c_ushort), + ("wReserved", ctypes.c_ushort), + ("dwPageSize", ctypes.c_ulong), + ("lpMinimumApplicationAddress", ctypes.c_void_p), + ("lpMaximumApplicationAddress", ctypes.c_void_p), + ("dwActiveProcessorMask", ctypes.c_void_p), + ("dwNumberOfProcessors", ctypes.c_ulong), + ("dwProcessorType", ctypes.c_ulong), + ("dwAllocationGranularity", ctypes.c_ulong), + ("wProcessorLevel", ctypes.c_ushort), + ("wProcessorRevision", ctypes.c_ushort), + ] - # RtlSecureZeroMemory for guaranteed zeroing - self.secure_zero_memory = kernel32.RtlSecureZeroMemory + try: + sysinfo = SYSTEM_INFO() + kernel32.GetSystemInfo(ctypes.byref(sysinfo)) + self._page_size = sysinfo.dwPageSize + except Exception: + self._page_size = 4096 + + # VirtualLock for memory locking + self._mlock_func = kernel32.VirtualLock + self._mlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._mlock_func.restype = ctypes.c_bool + + self._munlock_func = kernel32.VirtualUnlock + self._munlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._munlock_func.restype = ctypes.c_bool + + # RtlZeroMemory from ntdll (not RtlSecureZeroMemory which is a macro) + # Note: RtlZeroMemory may be optimized away by compiler, but it's the + # best we can do from Python. For true secure zeroing, we also + # implement a Python-level volatile write pattern. + try: + ntdll = ctypes.windll.ntdll + self._memset_func = ntdll.RtlZeroMemory + self._memset_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._memset_func.restype = None + self._windows_zero_is_rtlzero = True + except Exception: + # Fallback to kernel32.RtlZeroMemory if available + try: + self._memset_func = kernel32.RtlZeroMemory + self._memset_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._memset_func.restype = None + self._windows_zero_is_rtlzero = True + except Exception: + self._memset_func = None + self._windows_zero_is_rtlzero = False + logger.warning("RtlZeroMemory not available, using Python zeroing") self.has_mlock = True - self.has_secure_zero = True + self.has_secure_zero = True # We have fallback even if RtlZeroMemory fails self.protection_level = MemoryProtectionLevel.FULL - logger.info("Windows secure memory initialized (VirtualLock + RtlSecureZeroMemory)") + logger.info("Windows secure memory initialized (VirtualLock + RtlZeroMemory)") except Exception as e: logger.warning(f"Could not initialize Windows VirtualLock: {e}. Using fallback.") @@ -130,14 +299,28 @@ class SecureMemory: def _init_macos(self): """Initialize macOS-specific functions (mlock + memset)""" try: - self.libc = ctypes.CDLL('libc.dylib') - self.mlock = self.libc.mlock - self.munlock = self.libc.munlock - self.memset = self.libc.memset + # Use use_errno=True for proper errno handling + self._libc = ctypes.CDLL('libc.dylib', use_errno=True) - # Set return types - self.mlock.restype = ctypes.c_int - self.munlock.restype = ctypes.c_int + # Get page size + try: + self._page_size = self._libc.getpagesize() + except Exception: + self._page_size = 4096 + + # Setup mlock/munlock + self._mlock_func = self._libc.mlock + self._mlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._mlock_func.restype = ctypes.c_int + + self._munlock_func = self._libc.munlock + self._munlock_func.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self._munlock_func.restype = ctypes.c_int + + # Setup memset for secure zeroing + self._memset_func = self._libc.memset + self._memset_func.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t] + self._memset_func.restype = ctypes.c_void_p self.has_mlock = True self.has_secure_zero = True @@ -152,8 +335,11 @@ class SecureMemory: def _init_fallback(self): """Initialize fallback memory zeroing (no locking)""" self.has_mlock = False - self.has_secure_zero = False + self.has_secure_zero = True # We can still zero memory at Python level self.protection_level = MemoryProtectionLevel.ZEROING_ONLY + self._mlock_func = None + self._munlock_func = None + self._memset_func = None logger.info("Using fallback memory protection (zeroing only, no locking)") def _log_capabilities(self): @@ -163,61 +349,80 @@ class SecureMemory: f"Platform: {self.platform}, " f"Protection Level: {self.protection_level.value}, " f"Memory Locking: {self.has_mlock}, " - f"Secure Zeroing: {self.has_secure_zero}" + f"Secure Zeroing: {self.has_secure_zero}, " + f"Page Size: {self._page_size}" ) - def lock_memory(self, data: bytes) -> bool: + def _get_page_aligned_range(self, addr: int, size: int) -> tuple: """ - Lock memory pages containing data to prevent swapping to disk. + Calculate page-aligned address and size for mlock operations. Args: - data: Bytes object to lock in memory + addr: Memory address + size: Size in bytes + + Returns: + Tuple of (aligned_addr, aligned_size) + """ + page_mask = self._page_size - 1 + aligned_addr = addr & ~page_mask + end_addr = addr + size + aligned_end = (end_addr + page_mask) & ~page_mask + aligned_size = aligned_end - aligned_addr + return aligned_addr, aligned_size + + def _lock_memory_at(self, addr: int, size: int) -> bool: + """ + Lock memory at a specific address. + + Args: + addr: Memory address (will be page-aligned) + size: Size in bytes Returns: True if successfully locked, False otherwise """ - if not self.enabled or not self.has_mlock or not data: + if not self.enabled or not self.has_mlock or not self._mlock_func: return False try: - # Get memory address and size - addr = id(data) - size = len(data) + # Page-align the address and size + aligned_addr, aligned_size = self._get_page_aligned_range(addr, size) if self.platform.startswith('linux') or self.platform == 'darwin': - # POSIX mlock - result = self.mlock( - ctypes.c_void_p(addr), - ctypes.c_size_t(size) + result = self._mlock_func( + ctypes.c_void_p(aligned_addr), + ctypes.c_size_t(aligned_size) ) if result != 0: errno = ctypes.get_errno() - # ENOMEM (12) or EPERM (1) are common errors - if errno == 1: + if errno == 1: # EPERM logger.debug( "mlock permission denied. " "Grant CAP_IPC_LOCK or increase ulimit -l" ) - elif errno == 12: + elif errno == 12: # ENOMEM logger.debug("mlock failed: insufficient memory or limit exceeded") else: logger.debug(f"mlock failed with errno {errno}") return False + logger.debug(f"Memory locked: {size} bytes at 0x{addr:x}") return True elif self.platform == 'win32': - # Windows VirtualLock - result = self.virtual_lock( - ctypes.c_void_p(addr), - ctypes.c_size_t(size) + result = self._mlock_func( + ctypes.c_void_p(aligned_addr), + ctypes.c_size_t(aligned_size) ) if not result: - logger.debug("VirtualLock failed") + error = ctypes.get_last_error() + logger.debug(f"VirtualLock failed with error {error}") return False + logger.debug(f"Memory locked: {size} bytes at 0x{addr:x}") return True except Exception as e: @@ -226,37 +431,41 @@ class SecureMemory: return False - def unlock_memory(self, data: bytes) -> bool: + def _unlock_memory_at(self, addr: int, size: int) -> bool: """ - Unlock previously locked memory pages. + Unlock memory at a specific address. Args: - data: Bytes object to unlock + addr: Memory address (will be page-aligned) + size: Size in bytes Returns: True if successfully unlocked, False otherwise """ - if not self.enabled or not self.has_mlock or not data: + if not self.enabled or not self.has_mlock or not self._munlock_func: return False try: - addr = id(data) - size = len(data) + # Page-align the address and size + aligned_addr, aligned_size = self._get_page_aligned_range(addr, size) if self.platform.startswith('linux') or self.platform == 'darwin': - # POSIX munlock - result = self.munlock( - ctypes.c_void_p(addr), - ctypes.c_size_t(size) + result = self._munlock_func( + ctypes.c_void_p(aligned_addr), + ctypes.c_size_t(aligned_size) ) - return result == 0 + success = result == 0 + if success: + logger.debug(f"Memory unlocked: {size} bytes at 0x{addr:x}") + return success elif self.platform == 'win32': - # Windows VirtualUnlock - result = self.virtual_unlock( - ctypes.c_void_p(addr), - ctypes.c_size_t(size) + result = self._munlock_func( + ctypes.c_void_p(aligned_addr), + ctypes.c_size_t(aligned_size) ) + if result: + logger.debug(f"Memory unlocked: {size} bytes at 0x{addr:x}") return bool(result) except Exception as e: @@ -265,39 +474,166 @@ class SecureMemory: return False - def zero_memory(self, data: bytes) -> None: + def _zero_memory_at(self, addr: int, size: int) -> None: """ - Securely zero memory contents. + Securely zero memory at a specific address. - Note: Due to Python's memory management, we cannot directly zero - immutable bytes objects. This function is a best-effort approach - that works better with mutable bytearray objects. - - For maximum security, use this with bytearray instead of bytes, - or rely on memory locking to prevent swapping. + Uses platform-specific functions when available, with Python fallback. Args: - data: Bytes object to zero (best effort for bytes, effective for bytearray) + addr: Memory address + size: Size in bytes + """ + if not self.enabled or size == 0: + return + + try: + if self._memset_func is not None: + if self.platform.startswith('linux') or self.platform == 'darwin': + # memset(addr, 0, size) + self._memset_func( + ctypes.c_void_p(addr), + ctypes.c_int(0), + ctypes.c_size_t(size) + ) + elif self.platform == 'win32' and hasattr(self, '_windows_zero_is_rtlzero'): + # RtlZeroMemory(addr, size) + self._memset_func( + ctypes.c_void_p(addr), + ctypes.c_size_t(size) + ) + logger.debug(f"Memory zeroed (native): {size} bytes at 0x{addr:x}") + else: + # Fallback: zero via ctypes byte-by-byte + # This is slower but works everywhere + char_array = (ctypes.c_char * size).from_address(addr) + for i in range(size): + char_array[i] = b'\x00' + logger.debug(f"Memory zeroed (fallback): {size} bytes at 0x{addr:x}") + + except Exception as e: + logger.warning(f"Memory zeroing failed: {e}") + # Last resort: try to zero via ctypes + try: + char_array = (ctypes.c_char * size).from_address(addr) + for i in range(size): + char_array[i] = b'\x00' + except Exception: + pass + + def create_secure_buffer(self, data: Union[bytes, bytearray]) -> SecureBuffer: + """ + Create a SecureBuffer from data. + + This is the recommended way to handle sensitive data. + + Args: + data: Data to protect (will be copied, original should be discarded) + + Returns: + SecureBuffer instance + """ + return SecureBuffer(data, self) + + def lock_memory(self, data: bytearray) -> bool: + """ + Lock memory containing a bytearray to prevent swapping. + + IMPORTANT: Only works reliably with bytearray objects. + Use create_secure_buffer() for better security guarantees. + + Args: + data: Bytearray to lock in memory + + Returns: + True if successfully locked, False otherwise + """ + if not isinstance(data, bytearray): + logger.warning( + "lock_memory() called with non-bytearray. " + "Use create_secure_buffer() for bytes objects." + ) + return False + + if not self.enabled or not self.has_mlock or not data: + return False + + try: + # Create ctypes buffer to get correct address + ctypes_buffer = (ctypes.c_char * len(data)).from_buffer(data) + addr = ctypes.addressof(ctypes_buffer) + return self._lock_memory_at(addr, len(data)) + except Exception as e: + logger.debug(f"Memory lock failed: {e}") + return False + + def unlock_memory(self, data: bytearray) -> bool: + """ + Unlock previously locked bytearray memory. + + Args: + data: Bytearray to unlock + + Returns: + True if successfully unlocked, False otherwise + """ + if not isinstance(data, bytearray): + return False + + if not self.enabled or not self.has_mlock or not data: + return False + + try: + ctypes_buffer = (ctypes.c_char * len(data)).from_buffer(data) + addr = ctypes.addressof(ctypes_buffer) + return self._unlock_memory_at(addr, len(data)) + except Exception as e: + logger.debug(f"Memory unlock failed: {e}") + return False + + def zero_memory(self, data: bytearray) -> None: + """ + Securely zero a bytearray's memory contents. + + IMPORTANT: Only works with mutable bytearray objects. + Python's immutable bytes cannot be securely zeroed. + + Args: + data: Bytearray to zero """ if not self.enabled or not data: return + if not isinstance(data, bytearray): + logger.warning( + "zero_memory() called with non-bytearray. " + "Python bytes are immutable and cannot be securely zeroed. " + "Use bytearray or create_secure_buffer() instead." + ) + return + try: - # For bytearray (mutable), we can zero it - if isinstance(data, bytearray): - # Zero the bytearray in place - for i in range(len(data)): - data[i] = 0 - logger.debug(f"Zeroed bytearray: {len(data)} bytes") - else: - # For bytes (immutable), we can't actually zero the memory - # Python's bytes are immutable and reference counted - # The best we can do is ensure no lingering references - # and let Python's GC handle it - logger.debug(f"Bytes object is immutable, relying on GC: {len(data)} bytes") + # Get correct address via ctypes + ctypes_buffer = (ctypes.c_char * len(data)).from_buffer(data) + addr = ctypes.addressof(ctypes_buffer) + + # Use platform-specific zeroing + self._zero_memory_at(addr, len(data)) + + # Also zero at Python level for defense in depth + for i in range(len(data)): + data[i] = 0 + + logger.debug(f"Zeroed bytearray: {len(data)} bytes") except Exception as e: - logger.debug(f"Memory zeroing note: {e}") + logger.warning(f"Memory zeroing error: {e}") + # Fallback: zero at Python level only + try: + for i in range(len(data)): + data[i] = 0 + except Exception: + pass def get_protection_info(self) -> dict: """ @@ -312,34 +648,45 @@ class SecureMemory: "protection_level": self.protection_level.value, "has_memory_locking": self.has_mlock, "has_secure_zeroing": self.has_secure_zero, - "supports_full_protection": self.protection_level == MemoryProtectionLevel.FULL + "supports_full_protection": self.protection_level == MemoryProtectionLevel.FULL, + "page_size": self._page_size } + # Global secure memory instance -_secure_memory = SecureMemory() +_secure_memory: Optional[SecureMemory] = None + + +def _get_secure_memory() -> SecureMemory: + """Get or create the global SecureMemory instance.""" + global _secure_memory + if _secure_memory is None: + _secure_memory = SecureMemory() + return _secure_memory + @contextmanager -def secure_bytes(data: bytes, lock: bool = True): +def secure_bytearray(data: Union[bytes, bytearray], lock: bool = True): """ - Context manager for secure byte handling with automatic cleanup. + Context manager for secure bytearray handling with automatic cleanup. Provides: + - Proper memory address handling via ctypes - Optional memory locking to prevent swapping - - Guaranteed memory zeroing on exit + - Guaranteed memory zeroing on exit (both native and Python level) - Automatic cleanup even if exceptions occur Args: - data: Bytes to protect + data: Data to protect (bytes or bytearray, will be converted to bytearray) lock: Whether to attempt memory locking (default: True) - Set to False to skip locking but still zero on exit Yields: - The protected bytes object + SecureBuffer containing the protected data Example: payload = json.dumps({"secret": "data"}).encode('utf-8') - with secure_bytes(payload) as protected_data: - encrypted = encrypt(protected_data) + with secure_bytearray(payload) as buf: + encrypted = encrypt(buf.data) # Use encrypted data... # Memory automatically zeroed here @@ -347,32 +694,57 @@ def secure_bytes(data: bytes, lock: bool = True): Memory locking may fail without appropriate privileges. In that case, only zeroing is performed (still provides value). """ - locked = False + sm = _get_secure_memory() + secure_buf = sm.create_secure_buffer(data) try: - # Try to lock memory if requested - if lock and _secure_memory.enabled: - locked = _secure_memory.lock_memory(data) + if lock: + locked = secure_buf.lock() if locked: - logger.debug(f"Memory locked: {len(data)} bytes") + logger.debug(f"Memory locked: {len(secure_buf)} bytes") else: logger.debug( - f"Memory locking not available for {len(data)} bytes, " + f"Memory locking not available for {len(secure_buf)} bytes, " "using zeroing only" ) - # Yield data for use - yield data + yield secure_buf finally: # Always zero memory - _secure_memory.zero_memory(data) - logger.debug(f"Memory zeroed: {len(data)} bytes") + secure_buf.zero() + logger.debug(f"Memory zeroed: {len(secure_buf)} bytes") + + # Unlock if we attempted locking + if lock: + secure_buf.unlock() + logger.debug(f"Memory unlocked: {len(secure_buf)} bytes") + + +# Legacy API - maintained for backwards compatibility +@contextmanager +def secure_bytes(data: bytes, lock: bool = True): + """ + DEPRECATED: Use secure_bytearray() instead. + + This function is maintained for backwards compatibility but provides + weaker security guarantees. Python's immutable bytes cannot be securely + zeroed in place. + + Args: + data: Bytes to protect + lock: Whether to attempt memory locking + + Yields: + SecureBuffer (use .data attribute for the bytearray) + """ + logger.warning( + "secure_bytes() is deprecated. Use secure_bytearray() instead. " + "The original bytes object cannot be securely zeroed." + ) + with secure_bytearray(data, lock) as buf: + yield buf - # Unlock if locked - if locked: - _secure_memory.unlock_memory(data) - logger.debug(f"Memory unlocked: {len(data)} bytes") def get_memory_protection_info() -> dict: """ @@ -381,7 +753,8 @@ def get_memory_protection_info() -> dict: Returns: Dictionary with platform and capability information """ - return _secure_memory.get_protection_info() + return _get_secure_memory().get_protection_info() + def disable_secure_memory() -> None: """ @@ -393,6 +766,7 @@ def disable_secure_memory() -> None: _secure_memory = SecureMemory(enable=False) logger.info("Secure memory operations disabled globally") + def enable_secure_memory() -> None: """ Re-enable secure memory operations globally. diff --git a/nomyo/__init__.py b/nomyo/__init__.py index fb23f9b..427930d 100644 --- a/nomyo/__init__.py +++ b/nomyo/__init__.py @@ -20,7 +20,9 @@ try: get_memory_protection_info, disable_secure_memory, enable_secure_memory, - secure_bytes + secure_bytearray, + secure_bytes, # Deprecated, use secure_bytearray instead + SecureBuffer ) except ImportError: pass @@ -36,7 +38,9 @@ __all__ = [ 'get_memory_protection_info', 'disable_secure_memory', 'enable_secure_memory', - 'secure_bytes' + 'secure_bytearray', + 'secure_bytes', # Deprecated, use secure_bytearray instead + 'SecureBuffer' ] __version__ = "0.1.0"