LibRay/tests/test_iso.py

394 lines
16 KiB
Python

# -*- coding: utf8 -*-
# libray - Libre Blu-Ray PS3 ISO Tool
# Copyright © 2018 - 2024 Nichlas Severinsen
#
# This file is part of libray.
#
# libray is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# libray is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with libray. If not, see <https://www.gnu.org/licenses/>.
# This file contains tests to validate the key-retrieval logic for the
# ISO class.
# This script is not included in the release of libray.
import argparse
import os
import unittest
import unittest.mock as mock
from Crypto.Cipher import AES
from libray import core, iso
class TestISO(unittest.TestCase):
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_explicit_decryption_key(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(mock_args.decryption_key)
fake_iso = iso.ISO.__new__(iso.ISO)
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
self.assertEqual(decryptkey_bytes, returned_key)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_explicit_valid_ird(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = 'aaa.ird'
mock_args.decryption_key = False
mock_args.verbose = True
ird = mock.Mock()
ird.region_count = 3
ird.data1 = b'01010101010101010101010101010101'
cipher = AES.new(core.ISO_SECRET, AES.MODE_CBC, core.ISO_IV)
decryptkey_bytes = cipher.encrypt(ird.data1)
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 2048
fake_iso.regions = [
{'start': 0, 'end': 512, 'enc': False},
{'start': 512, 'end': 1024, 'enc': True},
{'start': 1024, 'end': 2048, 'enc': False}
]
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
mock_ird.assert_called_once_with('aaa.ird')
self.assertEqual(decryptkey_bytes, returned_key)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_ird_with_region_count_mismatch(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = 'aaa.ird'
mock_args.decryption_key = False
mock_args.verbose = True
ird = mock.Mock()
ird.region_count = 3
ird.data1 = b'01010101010101010101010101010101'
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512
fake_iso.regions = [
{'start': 0, 'end': 512, 'enc': False},
]
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_ird_with_invalid_start(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = 'aaa.ird'
mock_args.decryption_key = False
mock_args.verbose = True
ird = mock.Mock()
ird.region_count = 3
ird.data1 = b'01010101010101010101010101010101'
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.regions = [
{'start': 0, 'end': 3000000, 'enc': False},
{'start': 3000000, 'end': 2000000000, 'enc': True},
{'start': 2000000000, 'end': 2000001000, 'enc': False}
]
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_size_match(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]]
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
self.assertEqual(decryptkey_bytes, returned_key)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_size_multiple_match_name_lookup(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]]
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
self.assertEqual(decryptkey_bytes, returned_key)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_size_multiple_match_no_game_id(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]]
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args(None, mock_args)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_no_match_no_checksum(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.checksum = False
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
mocksql.connect().cursor().execute().fetchall.return_value = []
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_no_match_checksum_fallback(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.checksum = True
mock_args.checksum_timeout = 15
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
fakeresults = ([], [], [['AAA', decryptkey_bytes]])
mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults
with mock.patch('libray.iso.core.crc32', return_value='01010101'):
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
self.assertEqual(decryptkey_bytes, returned_key)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
def test_keys_db_no_match_checksum_timeout(self, mock_args):
mock_args.iso = 'fake.iso'
mock_args.ird = ''
mock_args.decryption_key = False
mock_args.checksum = True
mock_args.checksum_timeout = 15
mock_args.verbose = True
fake_iso = iso.ISO.__new__(iso.ISO)
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('libray.iso.sqlite3') as mocksql:
mocksql.connect().cursor().execute().fetchall.return_value = []
with mock.patch('libray.iso.core.crc32', return_value=None):
with self.assertRaises(TimeoutError):
fake_iso.get_key_from_args('AAA', mock_args)
def _build_fake_iso(self, num_sectors=8, disc_key=None):
"""Build a minimal fake ISO binary in a temp file.
Layout:
[0:4] num_unencrypted_regions (uint32 BE = 1)
[4:8] padding (4 bytes, skipped by constructor)
[8:12] unencrypted region start sector (uint32 BE = 0)
[12:16] unencrypted region end sector (uint32 BE = 2)
[16:20] encrypted region end sector (uint32 BE = 6)
[20:24] terminator (0)
[2048:2064] game_id (16 bytes)
[2064:2072] PARAM.SFO marker (b'\\x00PSF')
Sectors 0-1 = unencrypted header
Sectors 2-5 = encrypted payload (zeros)
Sector 6+ = unencrypted tail
"""
import tempfile
if disc_key is None:
disc_key = b'\xaa' * 16
# Region info: constructor reads bytes 0-3, skips 4-7, then
# read_regions() reads bytes 8-11 (start) and 12-15 (end)
region_info = b'\x00\x00\x00\x01' # num_unencrypted_regions (offset 0)
region_info += b'\x00\x00\x00\x00' # padding (offset 4, skipped)
region_info += b'\x00\x00\x00\x00' # unencrypted start = sector 0 (offset 8)
region_info += b'\x00\x00\x00\x02' # unencrypted end = sector 2 (offset 12)
region_info += b'\x00\x00\x00\x06' # encrypted end = sector 6 (offset 16)
region_info += b'\x00\x00\x00\x00' # terminator (offset 20)
# Sector 1 (offset 2048): unencrypted header with game_id and PARAM.SFO marker
sector1 = bytearray(2048)
# game_id at offset 2048+16=2064
sector1[16:32] = b'TEST-GAME0000000'
# PARAM.SFO marker at offset 2048+20=2068
sector1[20:24] = b'\x00PSF'
# Encrypt sectors 2-5 with AES-CBC per-sector IV
from libray.iso import _make_iv
encrypted_payload = bytearray()
for s in range(2, 6):
iv = _make_iv(s)
cipher = AES.new(disc_key, AES.MODE_CBC, iv)
encrypted_payload.extend(cipher.encrypt(b'\x00' * 2048))
# Pad so sector data starts at offset 2048 (PS3 ISO convention)
data = region_info + b'\x00' * (2048 - len(region_info)) + sector1 + encrypted_payload
# Ensure file is at least as large as the last region end (sector 6 = 12288 bytes)
# The constructor uses size to calculate total_sectors, so file must be >= region end
while len(data) < 12288:
data += b'\x00' * 2048
total = len(data)
fd, path = tempfile.mkstemp(suffix='.iso')
with os.fdopen(fd, 'wb') as f:
f.write(data)
return path, total
@mock.patch('argparse.ArgumentParser.parse_args')
def test_decrypt_parallel_pipeline(self, mock_args):
"""Verify the multi-threaded decrypt pipeline processes all sectors."""
disc_key = b'\xbb' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = ''
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 4
out_path = path + '.dec'
mock_args.output = out_path
s = iso.ISO(mock_args)
s.decrypt(mock_args)
self.assertTrue(os.path.exists(out_path))
with open(out_path, 'rb') as f:
decrypted = f.read()
# Sectors 2-5 should be decrypted back to zeros
self.assertEqual(len(decrypted), size)
# Encrypted region: start=6144 (region[0].end), end=12288
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
@mock.patch('argparse.ArgumentParser.parse_args')
def test_decrypt_sequential_fallback(self, mock_args):
"""Verify single-thread decrypt still works (sequential path)."""
disc_key = b'\xcc' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = ''
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 1
out_path = path + '.dec'
mock_args.output = out_path
s = iso.ISO(mock_args)
s.decrypt(mock_args)
self.assertTrue(os.path.exists(out_path))
with open(out_path, 'rb') as f:
decrypted = f.read()
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
@mock.patch('argparse.ArgumentParser.parse_args')
def test_encrypt_parallel_pipeline(self, mock_args):
"""Verify the multi-threaded re-encrypt pipeline works."""
disc_key = b'\xdd' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
# First decrypt to get a plain ISO
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = path + '.dec'
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 4
s = iso.ISO(mock_args)
s.decrypt(mock_args)
# Now re-encrypt with -r
reenc_path = path + '.reenc'
mock_args.iso = path + '.dec'
mock_args.decryption_key = disc_key.hex()
mock_args.reencrypt = True
mock_args.output = reenc_path
mock_args.threads = 4
s2 = iso.ISO(mock_args)
s2.encrypt(mock_args)
self.assertTrue(os.path.exists(reenc_path))
with open(reenc_path, 'rb') as f:
reencrypted = f.read()
with open(path, 'rb') as f:
original = f.read()
self.assertEqual(reencrypted, original)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
if os.path.exists(path + '.reenc'):
os.unlink(path + '.reenc')