refactor: replace print statements with structured logging

Replaced all print statements with appropriate logging levels (info, warning, debug) to improve maintainability and allow for better log management. Added module-level logger configuration for consistent logging across the application.
This commit is contained in:
Alpha Nerd 2025-12-17 16:41:23 +01:00
parent 3c5e1b7f18
commit ed8efb3a96

View file

@ -1,4 +1,4 @@
import json, base64, urllib.parse, httpx, os, secrets, warnings import json, base64, urllib.parse, httpx, os, secrets, warnings, logging
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives.asymmetric import rsa, padding
@ -6,6 +6,9 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.kdf.hkdf import HKDF
# Setup module logger
logger = logging.getLogger(__name__)
class SecurityError(Exception): class SecurityError(Exception):
"""Raised when a security violation is detected.""" """Raised when a security violation is detected."""
@ -48,7 +51,7 @@ class SecureCompletionClient:
stacklevel=2 stacklevel=2
) )
else: else:
print("⚠️ HTTP mode enabled for local development (INSECURE)") logger.warning("HTTP mode enabled for local development (INSECURE)")
async def generate_keys(self, save_to_file: bool = False, key_dir: str = "client_keys", password: Optional[str] = None) -> None: async def generate_keys(self, save_to_file: bool = False, key_dir: str = "client_keys", password: Optional[str] = None) -> None:
""" """
@ -59,7 +62,7 @@ class SecureCompletionClient:
key_dir: Directory to save keys (if save_to_file is True) key_dir: Directory to save keys (if save_to_file is True)
password: Optional password to encrypt private key (recommended for production) password: Optional password to encrypt private key (recommended for production)
""" """
print("🔑 Generating RSA key pair...") logger.info("Generating RSA key pair...")
# Generate private key # Generate private key
self.private_key = rsa.generate_private_key( self.private_key = rsa.generate_private_key(
@ -77,7 +80,7 @@ class SecureCompletionClient:
format=serialization.PublicFormat.SubjectPublicKeyInfo format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8') ).decode('utf-8')
print(f" ✓ Generated {self.key_size}-bit RSA key pair") logger.debug("Generated %d-bit RSA key pair", self.key_size)
if save_to_file: if save_to_file:
os.makedirs(key_dir, exist_ok=True) os.makedirs(key_dir, exist_ok=True)
@ -90,7 +93,7 @@ class SecureCompletionClient:
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password.encode('utf-8')) encryption_algorithm=serialization.BestAvailableEncryption(password.encode('utf-8'))
) )
print(f"Private key encrypted with password") logger.debug("Private key encrypted with password")
else: else:
# Save unencrypted for convenience (not recommended for production) # Save unencrypted for convenience (not recommended for production)
private_pem = self.private_key.private_bytes( private_pem = self.private_key.private_bytes(
@ -98,7 +101,7 @@ class SecureCompletionClient:
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) )
print(f" ⚠️ Private key saved UNENCRYPTED (not recommended for production)") logger.warning("Private key saved UNENCRYPTED (not recommended for production)")
# Write private key with restricted permissions (readable only by owner) # Write private key with restricted permissions (readable only by owner)
private_key_path = os.path.join(key_dir, "private_key.pem") private_key_path = os.path.join(key_dir, "private_key.pem")
@ -106,9 +109,9 @@ class SecureCompletionClient:
f.write(private_pem) f.write(private_pem)
try: try:
os.chmod(private_key_path, 0o600) # Only owner can read/write os.chmod(private_key_path, 0o600) # Only owner can read/write
print(f"Private key permissions set to 600 (owner-only access)") logger.debug("Private key permissions set to 600 (owner-only access)")
except Exception as e: except Exception as e:
print(f" ⚠️ Could not set private key permissions: {e}") logger.warning("Could not set private key permissions: %s", e)
# Save public key (always unencrypted, but with restricted permissions) # Save public key (always unencrypted, but with restricted permissions)
public_key_path = os.path.join(key_dir, "public_key.pem") public_key_path = os.path.join(key_dir, "public_key.pem")
@ -116,11 +119,11 @@ class SecureCompletionClient:
f.write(self.public_key_pem) f.write(self.public_key_pem)
try: try:
os.chmod(public_key_path, 0o644) # Owner read/write, group/others read os.chmod(public_key_path, 0o644) # Owner read/write, group/others read
print(f"Public key permissions set to 644") logger.debug("Public key permissions set to 644")
except Exception as e: except Exception as e:
print(f" ⚠️ Could not set public key permissions: {e}") logger.warning("Could not set public key permissions: %s", e)
print(f" ✓ Keys saved to {key_dir}/") logger.debug("Keys saved to %s/", key_dir)
async def load_keys(self, private_key_path: str, public_key_path: Optional[str] = None, password: Optional[str] = None) -> None: async def load_keys(self, private_key_path: str, public_key_path: Optional[str] = None, password: Optional[str] = None) -> None:
""" """
@ -131,7 +134,7 @@ class SecureCompletionClient:
public_key_path: Path to public key file (optional, derived from private key if not provided) public_key_path: Path to public key file (optional, derived from private key if not provided)
password: Optional password for encrypted private key password: Optional password for encrypted private key
""" """
print(f"🔑 Loading keys from files...") logger.info("Loading keys from files...")
# Load private key # Load private key
with open(private_key_path, "rb") as f: with open(private_key_path, "rb") as f:
@ -151,7 +154,7 @@ class SecureCompletionClient:
password=pwd, password=pwd,
backend=default_backend() backend=default_backend()
) )
print(f" ✓ Private key loaded {'with password' if pwd else 'without password'}") logger.debug("Private key loaded %s", 'with password' if pwd else 'without password')
break break
except Exception as e: except Exception as e:
last_error = e last_error = e
@ -175,7 +178,7 @@ class SecureCompletionClient:
# Validate loaded key # Validate loaded key
self._validate_rsa_key(self.private_key, "private") self._validate_rsa_key(self.private_key, "private")
print("Keys loaded successfully") logger.debug("Keys loaded successfully")
async def fetch_server_public_key(self) -> str: async def fetch_server_public_key(self) -> str:
""" """
@ -192,7 +195,7 @@ class SecureCompletionClient:
ConnectionError: If connection fails ConnectionError: If connection fails
ValueError: If response is invalid ValueError: If response is invalid
""" """
print("🔑 Fetching server's public key...") logger.info("Fetching server's public key...")
# Security check: Ensure HTTPS is used unless HTTP explicitly allowed # Security check: Ensure HTTPS is used unless HTTP explicitly allowed
if not self.router_url.startswith("https://"): if not self.router_url.startswith("https://"):
@ -203,7 +206,7 @@ class SecureCompletionClient:
"SecureChatCompletion(base_url='http://localhost:12434', allow_http=True)" "SecureChatCompletion(base_url='http://localhost:12434', allow_http=True)"
) )
else: else:
print(" ⚠️ Fetching key over HTTP (local development mode)") logger.warning("Fetching key over HTTP (local development mode)")
url = f"{self.router_url}/pki/public_key" url = f"{self.router_url}/pki/public_key"
@ -230,9 +233,9 @@ class SecureCompletionClient:
raise ValueError("Server returned invalid public key format") raise ValueError("Server returned invalid public key format")
if verify_ssl: if verify_ssl:
print("Server's public key fetched securely over HTTPS") logger.debug("Server's public key fetched securely over HTTPS")
else: else:
print(" ⚠️ Server's public key fetched over HTTP (INSECURE)") logger.warning("Server's public key fetched over HTTP (INSECURE)")
return server_public_key return server_public_key
else: else:
raise ValueError(f"Failed to fetch server's public key: HTTP {response.status_code}") raise ValueError(f"Failed to fetch server's public key: HTTP {response.status_code}")
@ -262,7 +265,7 @@ class SecureCompletionClient:
ValueError: If payload is invalid or too large ValueError: If payload is invalid or too large
SecurityError: If encryption fails SecurityError: If encryption fails
""" """
print("🔒 Encrypting payload...") logger.info("Encrypting payload...")
# Validate payload # Validate payload
if not isinstance(payload, dict): if not isinstance(payload, dict):
@ -280,7 +283,7 @@ class SecureCompletionClient:
if len(payload_json) > MAX_PAYLOAD_SIZE: if len(payload_json) > MAX_PAYLOAD_SIZE:
raise ValueError(f"Payload too large: {len(payload_json)} bytes (max: {MAX_PAYLOAD_SIZE})") raise ValueError(f"Payload too large: {len(payload_json)} bytes (max: {MAX_PAYLOAD_SIZE})")
print(f" Payload size: {len(payload_json)} bytes") logger.debug("Payload size: %d bytes", len(payload_json))
# Generate cryptographically secure random AES key # Generate cryptographically secure random AES key
aes_key = secrets.token_bytes(32) # 256-bit key aes_key = secrets.token_bytes(32) # 256-bit key
@ -329,7 +332,7 @@ class SecureCompletionClient:
# Serialize package to JSON and return as bytes # Serialize package to JSON and return as bytes
package_json = json.dumps(encrypted_package).encode('utf-8') package_json = json.dumps(encrypted_package).encode('utf-8')
print(f" ✓ Encrypted package size: {len(package_json)} bytes") logger.debug("Encrypted package size: %d bytes", len(package_json))
return package_json return package_json
@ -356,7 +359,7 @@ class SecureCompletionClient:
ValueError: If response format is invalid ValueError: If response format is invalid
SecurityError: If decryption fails or integrity check fails SecurityError: If decryption fails or integrity check fails
""" """
print("🔓 Decrypting response...") logger.info("Decrypting response...")
# Validate input # Validate input
if not encrypted_response: if not encrypted_response:
@ -430,8 +433,8 @@ class SecureCompletionClient:
"encryption_algorithm": package["algorithm"] "encryption_algorithm": package["algorithm"]
}) })
print(f"Response decrypted successfully") logger.debug("Response decrypted successfully")
print(f" Response size: {len(plaintext)} bytes") logger.debug("Response size: %d bytes", len(plaintext))
return response return response
@ -446,7 +449,7 @@ class SecureCompletionClient:
Returns: Returns:
Decrypted response from the LLM Decrypted response from the LLM
""" """
print("\n📤 Sending secure chat completion request...") logger.info("Sending secure chat completion request...")
# Step 1: Encrypt the payload # Step 1: Encrypt the payload
encrypted_payload = await self.encrypt_payload(payload) encrypted_payload = await self.encrypt_payload(payload)
@ -460,7 +463,7 @@ class SecureCompletionClient:
# Step 3: Send request to router # Step 3: Send request to router
url = f"{self.router_url}/v1/chat/secure_completion" url = f"{self.router_url}/v1/chat/secure_completion"
print(f" Target URL: {url}") logger.debug("Target URL: %s", url)
try: try:
async with httpx.AsyncClient(timeout=60.0) as client: async with httpx.AsyncClient(timeout=60.0) as client:
@ -470,7 +473,7 @@ class SecureCompletionClient:
content=encrypted_payload content=encrypted_payload
) )
print(f" HTTP Status: {response.status_code}") logger.debug("HTTP Status: %d", response.status_code)
if response.status_code == 200: if response.status_code == 200:
# Step 4: Decrypt the response # Step 4: Decrypt the response
@ -527,4 +530,4 @@ class SecureCompletionClient:
f"Minimum recommended size is {MIN_KEY_SIZE} bits." f"Minimum recommended size is {MIN_KEY_SIZE} bits."
) )
print(f" ✓ Valid {key_size}-bit RSA {key_type} key") logger.debug("Valid %d-bit RSA %s key", key_size, key_type)