Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: XNLLLLH <XNLLLLH@users.noreply.github.com>
214 lines
8.2 KiB
Python
214 lines
8.2 KiB
Python
"""crypto.py AES-256-GCM primitives + file-backed key storage.
|
|
|
|
Originally Plan 02-08; updated in W1 to retire the keyring
|
|
backend (which deadlocked the daemon under launchd via the macOS
|
|
Keychain ACL prompt) in favor of a file-backed primary backend at
|
|
`{IAI_MCP_STORE}/.crypto.key` (32 raw bytes, mode 0o600, uid-validated).
|
|
|
|
Covers:
|
|
- encrypt_field / decrypt_field round-trip (byte-for-byte)
|
|
- Cyrillic / CJK / Arabic round-trip (MEM-01 across languages)
|
|
- Associated data binding (swapped AD -> InvalidTag)
|
|
- Tamper detection (mutated ciphertext -> InvalidTag)
|
|
- is_encrypted prefix check
|
|
- Passphrase fallback when no `.crypto.key` file is present
|
|
(via IAI_MCP_CRYPTO_PASSPHRASE), deterministic across instances
|
|
|
|
File-backend specific behavior (file priority, uid/mode validation,
|
|
atomic write) is exercised in tests/test_crypto_file_backend.py.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import pytest
|
|
|
|
|
|
def test_crypto_module_exports() -> None:
|
|
"""crypto.py exposes encrypt_field / decrypt_field / is_encrypted / CryptoKey."""
|
|
from iai_mcp import crypto
|
|
assert hasattr(crypto, "encrypt_field")
|
|
assert hasattr(crypto, "decrypt_field")
|
|
assert hasattr(crypto, "is_encrypted")
|
|
assert hasattr(crypto, "CryptoKey")
|
|
assert hasattr(crypto, "derive_key_from_passphrase")
|
|
|
|
|
|
def test_crypto_roundtrip_basic() -> None:
|
|
"""encrypt(plaintext) -> decrypt -> byte-for-byte equal."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x00" * 32
|
|
plaintext = "hello world"
|
|
ciphertext = encrypt_field(plaintext, key)
|
|
assert isinstance(ciphertext, str)
|
|
recovered = decrypt_field(ciphertext, key)
|
|
assert recovered == plaintext
|
|
|
|
|
|
def test_crypto_roundtrip_cyrillic() -> None:
|
|
"""D-08a + Russian text byte-for-byte preserved."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x01" * 32
|
|
plaintext = "Привет, мир! Это тест шифрования."
|
|
ciphertext = encrypt_field(plaintext, key)
|
|
recovered = decrypt_field(ciphertext, key)
|
|
assert recovered == plaintext
|
|
# Byte-level equality after utf-8 encode+decode cycle.
|
|
assert recovered.encode("utf-8") == plaintext.encode("utf-8")
|
|
|
|
|
|
def test_crypto_roundtrip_cjk() -> None:
|
|
"""D-08a + Japanese / Chinese round-trip."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x02" * 32
|
|
plaintext = "こんにちは世界。これは暗号化テストです。"
|
|
ciphertext = encrypt_field(plaintext, key)
|
|
assert decrypt_field(ciphertext, key) == plaintext
|
|
|
|
|
|
def test_crypto_roundtrip_arabic() -> None:
|
|
"""D-08a + Arabic round-trip."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x03" * 32
|
|
plaintext = "مرحبا بالعالم. هذا اختبار تشفير."
|
|
ciphertext = encrypt_field(plaintext, key)
|
|
assert decrypt_field(ciphertext, key) == plaintext
|
|
|
|
|
|
def test_crypto_empty_string_roundtrip() -> None:
|
|
"""Empty plaintext encrypts and decrypts cleanly."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x04" * 32
|
|
assert decrypt_field(encrypt_field("", key), key) == ""
|
|
|
|
|
|
def test_crypto_associated_data_binding() -> None:
|
|
"""Ciphertext encrypted with AD=A cannot be decrypted with AD=B (InvalidTag)."""
|
|
from cryptography.exceptions import InvalidTag
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x05" * 32
|
|
ciphertext = encrypt_field("secret", key, associated_data=b"record_id_A")
|
|
with pytest.raises(InvalidTag):
|
|
decrypt_field(ciphertext, key, associated_data=b"record_id_B")
|
|
|
|
|
|
def test_crypto_associated_data_roundtrip_when_matching() -> None:
|
|
"""With matching AD the round-trip succeeds."""
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x06" * 32
|
|
ad = b"record_id_matching"
|
|
ct = encrypt_field("secret", key, associated_data=ad)
|
|
assert decrypt_field(ct, key, associated_data=ad) == "secret"
|
|
|
|
|
|
def test_crypto_tamper_detection() -> None:
|
|
"""A single-bit flip in ciphertext raises InvalidTag on decrypt."""
|
|
import base64
|
|
from cryptography.exceptions import InvalidTag
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key = b"\x07" * 32
|
|
ct = encrypt_field("secret", key)
|
|
# Strip the prefix, flip one byte in the base64 payload, re-wrap.
|
|
prefix = "iai:enc:v1:"
|
|
assert ct.startswith(prefix)
|
|
payload_b64 = ct[len(prefix):]
|
|
raw = bytearray(base64.b64decode(payload_b64))
|
|
# Flip the byte after the nonce (12 bytes) -- tamper the ciphertext itself.
|
|
raw[15] ^= 0x01
|
|
tampered = prefix + base64.b64encode(bytes(raw)).decode("ascii")
|
|
with pytest.raises(InvalidTag):
|
|
decrypt_field(tampered, key)
|
|
|
|
|
|
def test_crypto_wrong_key_fails() -> None:
|
|
"""Decrypt with a different key raises InvalidTag."""
|
|
from cryptography.exceptions import InvalidTag
|
|
from iai_mcp.crypto import encrypt_field, decrypt_field
|
|
key_a = b"\x08" * 32
|
|
key_b = b"\x09" * 32
|
|
ct = encrypt_field("secret", key_a)
|
|
with pytest.raises(InvalidTag):
|
|
decrypt_field(ct, key_b)
|
|
|
|
|
|
def test_is_encrypted_prefix_true() -> None:
|
|
"""is_encrypted returns True for strings that start with iai:enc:v1:"""
|
|
from iai_mcp.crypto import encrypt_field, is_encrypted
|
|
key = b"\x0a" * 32
|
|
ct = encrypt_field("hello", key)
|
|
assert is_encrypted(ct) is True
|
|
|
|
|
|
def test_is_encrypted_prefix_false() -> None:
|
|
"""is_encrypted returns False for plaintext / None / empty / wrong prefix."""
|
|
from iai_mcp.crypto import is_encrypted
|
|
assert is_encrypted("plaintext") is False
|
|
assert is_encrypted("") is False
|
|
assert is_encrypted("iai:enc:v0:abc") is False # Different version
|
|
assert is_encrypted("foo:bar") is False
|
|
|
|
|
|
def test_crypto_unique_nonce_per_encrypt() -> None:
|
|
"""Two encryptions of the same plaintext under the same key produce different ciphertexts."""
|
|
from iai_mcp.crypto import encrypt_field
|
|
key = b"\x0b" * 32
|
|
ct1 = encrypt_field("repeat", key)
|
|
ct2 = encrypt_field("repeat", key)
|
|
assert ct1 != ct2 # Random nonce ensures ciphertext differs
|
|
|
|
|
|
def test_derive_key_from_passphrase_deterministic() -> None:
|
|
"""Same passphrase + same salt -> same derived key (PBKDF2)."""
|
|
from iai_mcp.crypto import derive_key_from_passphrase
|
|
salt = b"saltsaltsaltsalt" # 16 bytes
|
|
k1 = derive_key_from_passphrase("hunter2", salt)
|
|
k2 = derive_key_from_passphrase("hunter2", salt)
|
|
assert k1 == k2
|
|
assert len(k1) == 32 # 256 bits
|
|
|
|
|
|
def test_derive_key_from_passphrase_different_salts() -> None:
|
|
"""Same passphrase, different salts -> different keys."""
|
|
from iai_mcp.crypto import derive_key_from_passphrase
|
|
salt_a = b"A" * 16
|
|
salt_b = b"B" * 16
|
|
assert derive_key_from_passphrase("same", salt_a) != derive_key_from_passphrase("same", salt_b)
|
|
|
|
|
|
def test_derive_key_uses_600k_iterations() -> None:
|
|
"""OWASP 2023: PBKDF2-HMAC-SHA256 recommends 600k iterations minimum."""
|
|
from iai_mcp import crypto
|
|
assert crypto.PBKDF2_ITERATIONS >= 600_000
|
|
|
|
|
|
def test_crypto_key_passphrase_fallback_when_file_missing(
|
|
tmp_path, monkeypatch
|
|
) -> None:
|
|
"""Phase 07.10 W1 RED — file-backed CryptoKey falls back to passphrase
|
|
when no `.crypto.key` file exists in store_root.
|
|
|
|
Priority order under the new backend: file -> passphrase env var
|
|
-> CryptoKeyError. This test exercises the second tier: file is absent,
|
|
IAI_MCP_CRYPTO_PASSPHRASE is set, get_or_create() must return a 32-byte
|
|
derived key that is deterministic across instances (same passphrase +
|
|
same salt -> same key). NO keyring mocking — the keyring backend is
|
|
gone in W2, so this test must not depend on it.
|
|
|
|
RED until W2: CryptoKey does not yet accept store_root kwarg.
|
|
"""
|
|
from iai_mcp import crypto
|
|
|
|
# No `.crypto.key` written to tmp_path -> file backend miss.
|
|
assert not (tmp_path / ".crypto.key").exists()
|
|
|
|
monkeypatch.setenv("IAI_MCP_CRYPTO_PASSPHRASE", "hunter2-fallback")
|
|
|
|
ck = crypto.CryptoKey(user_id="t", store_root=tmp_path)
|
|
key1 = ck.get_or_create()
|
|
assert isinstance(key1, bytes)
|
|
assert len(key1) == 32
|
|
|
|
# Same passphrase + same user_id (salt) -> same derived key on a fresh
|
|
# instance with the same store_root.
|
|
ck2 = crypto.CryptoKey(user_id="t", store_root=tmp_path)
|
|
key2 = ck2.get_or_create()
|
|
assert key1 == key2
|