394 lines
16 KiB
Python
394 lines
16 KiB
Python
# -*- coding: utf8 -*-
|
|
|
|
# libray - Libre Blu-Ray PS3 ISO Tool
|
|
# Copyright © 2018 - 2024 Nichlas Severinsen
|
|
#
|
|
# This file is part of libray.
|
|
#
|
|
# libray is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# libray is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with libray. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# This file contains tests to validate the key-retrieval logic for the
|
|
# ISO class.
|
|
# This script is not included in the release of libray.
|
|
|
|
|
|
import argparse
|
|
import os
|
|
import unittest
|
|
import unittest.mock as mock
|
|
from Crypto.Cipher import AES
|
|
|
|
from libray import core, iso
|
|
|
|
class TestISO(unittest.TestCase):
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_explicit_decryption_key(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
|
decryptkey_bytes = core.to_bytes(mock_args.decryption_key)
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
|
|
self.assertEqual(decryptkey_bytes, returned_key)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_explicit_valid_ird(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = 'aaa.ird'
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
ird = mock.Mock()
|
|
ird.region_count = 3
|
|
ird.data1 = b'01010101010101010101010101010101'
|
|
cipher = AES.new(core.ISO_SECRET, AES.MODE_CBC, core.ISO_IV)
|
|
decryptkey_bytes = cipher.encrypt(ird.data1)
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 2048
|
|
fake_iso.regions = [
|
|
{'start': 0, 'end': 512, 'enc': False},
|
|
{'start': 512, 'end': 1024, 'enc': True},
|
|
{'start': 1024, 'end': 2048, 'enc': False}
|
|
]
|
|
|
|
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
|
|
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
|
|
mock_ird.assert_called_once_with('aaa.ird')
|
|
self.assertEqual(decryptkey_bytes, returned_key)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_ird_with_region_count_mismatch(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = 'aaa.ird'
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
ird = mock.Mock()
|
|
ird.region_count = 3
|
|
ird.data1 = b'01010101010101010101010101010101'
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512
|
|
fake_iso.regions = [
|
|
{'start': 0, 'end': 512, 'enc': False},
|
|
]
|
|
|
|
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
|
|
with self.assertRaises(SystemExit):
|
|
fake_iso.get_key_from_args('AAA', mock_args)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_ird_with_invalid_start(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = 'aaa.ird'
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
ird = mock.Mock()
|
|
ird.region_count = 3
|
|
ird.data1 = b'01010101010101010101010101010101'
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.regions = [
|
|
{'start': 0, 'end': 3000000, 'enc': False},
|
|
{'start': 3000000, 'end': 2000000000, 'enc': True},
|
|
{'start': 2000000000, 'end': 2000001000, 'enc': False}
|
|
]
|
|
|
|
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
|
|
with self.assertRaises(SystemExit):
|
|
fake_iso.get_key_from_args('AAA', mock_args)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_size_match(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
|
decryptkey_bytes = core.to_bytes(decryption_key)
|
|
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]]
|
|
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
|
|
self.assertEqual(decryptkey_bytes, returned_key)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_size_multiple_match_name_lookup(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
|
decryptkey_bytes = core.to_bytes(decryption_key)
|
|
mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]]
|
|
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
|
|
self.assertEqual(decryptkey_bytes, returned_key)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_size_multiple_match_no_game_id(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
|
decryptkey_bytes = core.to_bytes(decryption_key)
|
|
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]]
|
|
with self.assertRaises(SystemExit):
|
|
fake_iso.get_key_from_args(None, mock_args)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_no_match_no_checksum(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.checksum = False
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
mocksql.connect().cursor().execute().fetchall.return_value = []
|
|
with self.assertRaises(SystemExit):
|
|
fake_iso.get_key_from_args('AAA', mock_args)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_no_match_checksum_fallback(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.checksum = True
|
|
mock_args.checksum_timeout = 15
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
|
|
decryptkey_bytes = core.to_bytes(decryption_key)
|
|
fakeresults = ([], [], [['AAA', decryptkey_bytes]])
|
|
mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults
|
|
with mock.patch('libray.iso.core.crc32', return_value='01010101'):
|
|
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
|
|
self.assertEqual(decryptkey_bytes, returned_key)
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
|
|
def test_keys_db_no_match_checksum_timeout(self, mock_args):
|
|
mock_args.iso = 'fake.iso'
|
|
mock_args.ird = ''
|
|
mock_args.decryption_key = False
|
|
mock_args.checksum = True
|
|
mock_args.checksum_timeout = 15
|
|
mock_args.verbose = True
|
|
|
|
fake_iso = iso.ISO.__new__(iso.ISO)
|
|
fake_iso.size = 512 * 1024 * 1024
|
|
fake_iso.game_id = 'TCUS-12345'
|
|
|
|
with mock.patch('libray.iso.sqlite3') as mocksql:
|
|
mocksql.connect().cursor().execute().fetchall.return_value = []
|
|
with mock.patch('libray.iso.core.crc32', return_value=None):
|
|
with self.assertRaises(TimeoutError):
|
|
fake_iso.get_key_from_args('AAA', mock_args)
|
|
|
|
def _build_fake_iso(self, num_sectors=8, disc_key=None):
|
|
"""Build a minimal fake ISO binary in a temp file.
|
|
|
|
Layout:
|
|
[0:4] num_unencrypted_regions (uint32 BE = 1)
|
|
[4:8] padding (4 bytes, skipped by constructor)
|
|
[8:12] unencrypted region start sector (uint32 BE = 0)
|
|
[12:16] unencrypted region end sector (uint32 BE = 2)
|
|
[16:20] encrypted region end sector (uint32 BE = 6)
|
|
[20:24] terminator (0)
|
|
[2048:2064] game_id (16 bytes)
|
|
[2064:2072] PARAM.SFO marker (b'\\x00PSF')
|
|
Sectors 0-1 = unencrypted header
|
|
Sectors 2-5 = encrypted payload (zeros)
|
|
Sector 6+ = unencrypted tail
|
|
"""
|
|
import tempfile
|
|
if disc_key is None:
|
|
disc_key = b'\xaa' * 16
|
|
|
|
# Region info: constructor reads bytes 0-3, skips 4-7, then
|
|
# read_regions() reads bytes 8-11 (start) and 12-15 (end)
|
|
region_info = b'\x00\x00\x00\x01' # num_unencrypted_regions (offset 0)
|
|
region_info += b'\x00\x00\x00\x00' # padding (offset 4, skipped)
|
|
region_info += b'\x00\x00\x00\x00' # unencrypted start = sector 0 (offset 8)
|
|
region_info += b'\x00\x00\x00\x02' # unencrypted end = sector 2 (offset 12)
|
|
region_info += b'\x00\x00\x00\x06' # encrypted end = sector 6 (offset 16)
|
|
region_info += b'\x00\x00\x00\x00' # terminator (offset 20)
|
|
|
|
# Sector 1 (offset 2048): unencrypted header with game_id and PARAM.SFO marker
|
|
sector1 = bytearray(2048)
|
|
# game_id at offset 2048+16=2064
|
|
sector1[16:32] = b'TEST-GAME0000000'
|
|
# PARAM.SFO marker at offset 2048+20=2068
|
|
sector1[20:24] = b'\x00PSF'
|
|
|
|
# Encrypt sectors 2-5 with AES-CBC per-sector IV
|
|
from libray.iso import _make_iv
|
|
encrypted_payload = bytearray()
|
|
for s in range(2, 6):
|
|
iv = _make_iv(s)
|
|
cipher = AES.new(disc_key, AES.MODE_CBC, iv)
|
|
encrypted_payload.extend(cipher.encrypt(b'\x00' * 2048))
|
|
|
|
# Pad so sector data starts at offset 2048 (PS3 ISO convention)
|
|
data = region_info + b'\x00' * (2048 - len(region_info)) + sector1 + encrypted_payload
|
|
|
|
# Ensure file is at least as large as the last region end (sector 6 = 12288 bytes)
|
|
# The constructor uses size to calculate total_sectors, so file must be >= region end
|
|
while len(data) < 12288:
|
|
data += b'\x00' * 2048
|
|
|
|
total = len(data)
|
|
|
|
fd, path = tempfile.mkstemp(suffix='.iso')
|
|
with os.fdopen(fd, 'wb') as f:
|
|
f.write(data)
|
|
return path, total
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args')
|
|
def test_decrypt_parallel_pipeline(self, mock_args):
|
|
"""Verify the multi-threaded decrypt pipeline processes all sectors."""
|
|
disc_key = b'\xbb' * 16
|
|
path, size = self._build_fake_iso(disc_key=disc_key)
|
|
try:
|
|
mock_args.iso = path
|
|
mock_args.decryption_key = disc_key.hex()
|
|
mock_args.ird = ''
|
|
mock_args.output = ''
|
|
mock_args.verbose = True
|
|
mock_args.quiet = True
|
|
mock_args.checksum = False
|
|
mock_args.checksum_timeout = 0
|
|
mock_args.threads = 4
|
|
|
|
out_path = path + '.dec'
|
|
mock_args.output = out_path
|
|
|
|
s = iso.ISO(mock_args)
|
|
s.decrypt(mock_args)
|
|
|
|
self.assertTrue(os.path.exists(out_path))
|
|
with open(out_path, 'rb') as f:
|
|
decrypted = f.read()
|
|
# Sectors 2-5 should be decrypted back to zeros
|
|
self.assertEqual(len(decrypted), size)
|
|
# Encrypted region: start=6144 (region[0].end), end=12288
|
|
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
|
|
finally:
|
|
os.unlink(path)
|
|
if os.path.exists(path + '.dec'):
|
|
os.unlink(path + '.dec')
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args')
|
|
def test_decrypt_sequential_fallback(self, mock_args):
|
|
"""Verify single-thread decrypt still works (sequential path)."""
|
|
disc_key = b'\xcc' * 16
|
|
path, size = self._build_fake_iso(disc_key=disc_key)
|
|
try:
|
|
mock_args.iso = path
|
|
mock_args.decryption_key = disc_key.hex()
|
|
mock_args.ird = ''
|
|
mock_args.output = ''
|
|
mock_args.verbose = True
|
|
mock_args.quiet = True
|
|
mock_args.checksum = False
|
|
mock_args.checksum_timeout = 0
|
|
mock_args.threads = 1
|
|
|
|
out_path = path + '.dec'
|
|
mock_args.output = out_path
|
|
|
|
s = iso.ISO(mock_args)
|
|
s.decrypt(mock_args)
|
|
|
|
self.assertTrue(os.path.exists(out_path))
|
|
with open(out_path, 'rb') as f:
|
|
decrypted = f.read()
|
|
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
|
|
finally:
|
|
os.unlink(path)
|
|
if os.path.exists(path + '.dec'):
|
|
os.unlink(path + '.dec')
|
|
|
|
@mock.patch('argparse.ArgumentParser.parse_args')
|
|
def test_encrypt_parallel_pipeline(self, mock_args):
|
|
"""Verify the multi-threaded re-encrypt pipeline works."""
|
|
disc_key = b'\xdd' * 16
|
|
path, size = self._build_fake_iso(disc_key=disc_key)
|
|
try:
|
|
# First decrypt to get a plain ISO
|
|
mock_args.iso = path
|
|
mock_args.decryption_key = disc_key.hex()
|
|
mock_args.ird = ''
|
|
mock_args.output = path + '.dec'
|
|
mock_args.verbose = True
|
|
mock_args.quiet = True
|
|
mock_args.checksum = False
|
|
mock_args.checksum_timeout = 0
|
|
mock_args.threads = 4
|
|
s = iso.ISO(mock_args)
|
|
s.decrypt(mock_args)
|
|
|
|
# Now re-encrypt with -r
|
|
reenc_path = path + '.reenc'
|
|
mock_args.iso = path + '.dec'
|
|
mock_args.decryption_key = disc_key.hex()
|
|
mock_args.reencrypt = True
|
|
mock_args.output = reenc_path
|
|
mock_args.threads = 4
|
|
|
|
s2 = iso.ISO(mock_args)
|
|
s2.encrypt(mock_args)
|
|
|
|
self.assertTrue(os.path.exists(reenc_path))
|
|
with open(reenc_path, 'rb') as f:
|
|
reencrypted = f.read()
|
|
with open(path, 'rb') as f:
|
|
original = f.read()
|
|
self.assertEqual(reencrypted, original)
|
|
finally:
|
|
os.unlink(path)
|
|
if os.path.exists(path + '.dec'):
|
|
os.unlink(path + '.dec')
|
|
if os.path.exists(path + '.reenc'):
|
|
os.unlink(path + '.reenc')
|
|
|
|
|