# -*- coding: utf8 -*- # libray - Libre Blu-Ray PS3 ISO Tool # Copyright © 2018 - 2024 Nichlas Severinsen # # This file is part of libray. # # libray is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # libray is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with libray. If not, see . # This file contains tests to validate the key-retrieval logic for the # ISO class. # This script is not included in the release of libray. import argparse import os import unittest import unittest.mock as mock from Crypto.Cipher import AES from libray import core, iso class TestISO(unittest.TestCase): @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_explicit_decryption_key(self, mock_args): mock_args.iso = 'fake.iso' mock_args.decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(mock_args.decryption_key) fake_iso = iso.ISO.__new__(iso.ISO) returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_explicit_valid_ird(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = 'aaa.ird' mock_args.decryption_key = False mock_args.verbose = True ird = mock.Mock() ird.region_count = 3 ird.data1 = b'01010101010101010101010101010101' cipher = AES.new(core.ISO_SECRET, AES.MODE_CBC, core.ISO_IV) decryptkey_bytes = cipher.encrypt(ird.data1) fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 2048 fake_iso.regions = [ {'start': 0, 'end': 512, 'enc': False}, {'start': 512, 'end': 1024, 'enc': True}, {'start': 1024, 'end': 2048, 'enc': False} ] with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: returned_key = fake_iso.get_key_from_args('AAA', mock_args) mock_ird.assert_called_once_with('aaa.ird') self.assertEqual(decryptkey_bytes, returned_key) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_ird_with_region_count_mismatch(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = 'aaa.ird' mock_args.decryption_key = False mock_args.verbose = True ird = mock.Mock() ird.region_count = 3 ird.data1 = b'01010101010101010101010101010101' fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 fake_iso.regions = [ {'start': 0, 'end': 512, 'enc': False}, ] with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_ird_with_invalid_start(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = 'aaa.ird' mock_args.decryption_key = False mock_args.verbose = True ird = mock.Mock() ird.region_count = 3 ird.data1 = b'01010101010101010101010101010101' fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.regions = [ {'start': 0, 'end': 3000000, 'enc': False}, {'start': 3000000, 'end': 2000000000, 'enc': True}, {'start': 2000000000, 'end': 2000001000, 'enc': False} ] with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_size_match(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]] returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_size_multiple_match_name_lookup(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]] returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_size_multiple_match_no_game_id(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]] with self.assertRaises(SystemExit): fake_iso.get_key_from_args(None, mock_args) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_no_match_no_checksum(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.checksum = False mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_no_match_checksum_fallback(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.checksum = True mock_args.checksum_timeout = 15 mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) fakeresults = ([], [], [['AAA', decryptkey_bytes]]) mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults with mock.patch('libray.iso.core.crc32', return_value='01010101'): returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) def test_keys_db_no_match_checksum_timeout(self, mock_args): mock_args.iso = 'fake.iso' mock_args.ird = '' mock_args.decryption_key = False mock_args.checksum = True mock_args.checksum_timeout = 15 mock_args.verbose = True fake_iso = iso.ISO.__new__(iso.ISO) fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] with mock.patch('libray.iso.core.crc32', return_value=None): with self.assertRaises(TimeoutError): fake_iso.get_key_from_args('AAA', mock_args) def _build_fake_iso(self, num_sectors=8, disc_key=None): """Build a minimal fake ISO binary in a temp file. Layout: [0:4] num_unencrypted_regions (uint32 BE = 1) [4:8] padding (4 bytes, skipped by constructor) [8:12] unencrypted region start sector (uint32 BE = 0) [12:16] unencrypted region end sector (uint32 BE = 2) [16:20] encrypted region end sector (uint32 BE = 6) [20:24] terminator (0) [2048:2064] game_id (16 bytes) [2064:2072] PARAM.SFO marker (b'\\x00PSF') Sectors 0-1 = unencrypted header Sectors 2-5 = encrypted payload (zeros) Sector 6+ = unencrypted tail """ import tempfile if disc_key is None: disc_key = b'\xaa' * 16 # Region info: constructor reads bytes 0-3, skips 4-7, then # read_regions() reads bytes 8-11 (start) and 12-15 (end) region_info = b'\x00\x00\x00\x01' # num_unencrypted_regions (offset 0) region_info += b'\x00\x00\x00\x00' # padding (offset 4, skipped) region_info += b'\x00\x00\x00\x00' # unencrypted start = sector 0 (offset 8) region_info += b'\x00\x00\x00\x02' # unencrypted end = sector 2 (offset 12) region_info += b'\x00\x00\x00\x06' # encrypted end = sector 6 (offset 16) region_info += b'\x00\x00\x00\x00' # terminator (offset 20) # Sector 1 (offset 2048): unencrypted header with game_id and PARAM.SFO marker sector1 = bytearray(2048) # game_id at offset 2048+16=2064 sector1[16:32] = b'TEST-GAME0000000' # PARAM.SFO marker at offset 2048+20=2068 sector1[20:24] = b'\x00PSF' # Encrypt sectors 2-5 with AES-CBC per-sector IV from libray.iso import _make_iv encrypted_payload = bytearray() for s in range(2, 6): iv = _make_iv(s) cipher = AES.new(disc_key, AES.MODE_CBC, iv) encrypted_payload.extend(cipher.encrypt(b'\x00' * 2048)) # Pad so sector data starts at offset 2048 (PS3 ISO convention) data = region_info + b'\x00' * (2048 - len(region_info)) + sector1 + encrypted_payload # Ensure file is at least as large as the last region end (sector 6 = 12288 bytes) # The constructor uses size to calculate total_sectors, so file must be >= region end while len(data) < 12288: data += b'\x00' * 2048 total = len(data) fd, path = tempfile.mkstemp(suffix='.iso') with os.fdopen(fd, 'wb') as f: f.write(data) return path, total @mock.patch('argparse.ArgumentParser.parse_args') def test_decrypt_parallel_pipeline(self, mock_args): """Verify the multi-threaded decrypt pipeline processes all sectors.""" disc_key = b'\xbb' * 16 path, size = self._build_fake_iso(disc_key=disc_key) try: mock_args.iso = path mock_args.decryption_key = disc_key.hex() mock_args.ird = '' mock_args.output = '' mock_args.verbose = True mock_args.quiet = True mock_args.checksum = False mock_args.checksum_timeout = 0 mock_args.threads = 4 out_path = path + '.dec' mock_args.output = out_path s = iso.ISO(mock_args) s.decrypt(mock_args) self.assertTrue(os.path.exists(out_path)) with open(out_path, 'rb') as f: decrypted = f.read() # Sectors 2-5 should be decrypted back to zeros self.assertEqual(len(decrypted), size) # Encrypted region: start=6144 (region[0].end), end=12288 self.assertEqual(decrypted[6144:12288], b'\x00' * 6144) finally: os.unlink(path) if os.path.exists(path + '.dec'): os.unlink(path + '.dec') @mock.patch('argparse.ArgumentParser.parse_args') def test_decrypt_sequential_fallback(self, mock_args): """Verify single-thread decrypt still works (sequential path).""" disc_key = b'\xcc' * 16 path, size = self._build_fake_iso(disc_key=disc_key) try: mock_args.iso = path mock_args.decryption_key = disc_key.hex() mock_args.ird = '' mock_args.output = '' mock_args.verbose = True mock_args.quiet = True mock_args.checksum = False mock_args.checksum_timeout = 0 mock_args.threads = 1 out_path = path + '.dec' mock_args.output = out_path s = iso.ISO(mock_args) s.decrypt(mock_args) self.assertTrue(os.path.exists(out_path)) with open(out_path, 'rb') as f: decrypted = f.read() self.assertEqual(decrypted[6144:12288], b'\x00' * 6144) finally: os.unlink(path) if os.path.exists(path + '.dec'): os.unlink(path + '.dec') @mock.patch('argparse.ArgumentParser.parse_args') def test_encrypt_parallel_pipeline(self, mock_args): """Verify the multi-threaded re-encrypt pipeline works.""" disc_key = b'\xdd' * 16 path, size = self._build_fake_iso(disc_key=disc_key) try: # First decrypt to get a plain ISO mock_args.iso = path mock_args.decryption_key = disc_key.hex() mock_args.ird = '' mock_args.output = path + '.dec' mock_args.verbose = True mock_args.quiet = True mock_args.checksum = False mock_args.checksum_timeout = 0 mock_args.threads = 4 s = iso.ISO(mock_args) s.decrypt(mock_args) # Now re-encrypt with -r reenc_path = path + '.reenc' mock_args.iso = path + '.dec' mock_args.decryption_key = disc_key.hex() mock_args.reencrypt = True mock_args.output = reenc_path mock_args.threads = 4 s2 = iso.ISO(mock_args) s2.encrypt(mock_args) self.assertTrue(os.path.exists(reenc_path)) with open(reenc_path, 'rb') as f: reencrypted = f.read() with open(path, 'rb') as f: original = f.read() self.assertEqual(reencrypted, original) finally: os.unlink(path) if os.path.exists(path + '.dec'): os.unlink(path + '.dec') if os.path.exists(path + '.reenc'): os.unlink(path + '.reenc')