# -*- coding: utf8 -*- # libray - Libre Blu-Ray PS3 ISO Tool # Copyright © 2018 - 2024 Nichlas Severinsen # # This file is part of libray. # # libray is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # libray is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with libray. If not, see . import os import sys import sqlite3 import concurrent.futures import threading from collections import deque from threading import Thread from importlib import resources from tqdm import tqdm from Crypto.Cipher import AES try: from libray import core from libray import ird from libray import sfo except ImportError: import core import ird import sfo def _make_iv(sector_number): """Build a 16-byte IV from a sector number (little-endian).""" iv = bytearray(16) num = sector_number for j in range(16): iv[15 - j] = num & 0xFF num >>= 8 return bytes(iv) def _process_sector_chunk_mp(args): """Multiprocessing worker: (de/en)crypt a contiguous run of sectors. Picklable, module-level function for use with ProcessPoolExecutor. Takes a single ``bytes`` blob of one or more whole sectors and returns the processed blob, so only one (large) object is pickled per task instead of many small ones — this is what makes the multiprocessing worthwhile. """ disc_key, sector_blob, sector_start_number, encrypt_mode = args out = bytearray() for offset in range(0, len(sector_blob), core.SECTOR): sector = sector_blob[offset:offset + core.SECTOR] iv = _make_iv(sector_start_number + offset // core.SECTOR) cipher = AES.new(disc_key, AES.MODE_CBC, iv) out += cipher.encrypt(sector) if encrypt_mode else cipher.decrypt(sector) return bytes(out) class ISO: """Class for handling PS3 .iso files. Attributes: size: Size of .iso in bytes number_of_regions: Number of regions in the .iso regions: List with info of every region game_id: PS3 game id ird: IRD object (see ird.py) disc_key: data1 from .ird, encrypted """ NUM_INFO_BYTES = 4 def read_regions(self, input_iso): """List with info dict (start, end, whether it's encrypted) for every region. Basically, every other (odd numbered) region is encrypted. """ # The first region is always unencrypted encrypted = False regions = [{ 'start': core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR, # Should always be 0? 'end': core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR + core.SECTOR, 'enc': encrypted }] # We'll read 4 bytes until we hit a non-size (<=0) while True: encrypted = not encrypted end = core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR if not end: break regions.append({ 'start': regions[-1]['end'], 'end': end + core.SECTOR - (core.SECTOR if encrypted else 0), 'enc': encrypted }) return regions def __init__(self, args): """ISO constructor using args from argparse.""" self.size = core.size(args.iso) if not self.size: core.error('looks like ISO file/mount is empty?') with open(args.iso, 'rb') as input_iso: # Get number of unencrypted regions self.number_of_unencrypted_regions = core.to_int(input_iso.read(self.NUM_INFO_BYTES)) # Skip unused bytes input_iso.seek(input_iso.tell() + self.NUM_INFO_BYTES) self.regions = self.read_regions(input_iso) # Seek to the start of sector 2, '+ 16' skips a section containing some 'playstation' input_iso.seek(core.SECTOR + 16) self.game_id = input_iso.read(16).decode('utf8').strip() # Find PARAM.SFO core.vprint('Searching for PARAM.SFO', args) input_iso.seek(0) counter = 1 found_param = False while True: data = input_iso.read(8) if not data: break # if data == b'PS3LICDA': # print(data) if data[0:4] == b'\x00\x50\x53\x46': found_param = True # input_iso.seek(input_iso.tell() - 8) # param = sfo.SFO(input_iso) # print(param['TITLE']) # print(param['TITLE_ID']) break input_iso.seek((core.SECTOR * counter)) counter += 1 game_title = '' if found_param: input_iso.seek(input_iso.tell() - 8) try: param = sfo.SFO(input_iso) core.vprint('PARAM.SFO found', args) game_title = core.multiman_title(param['TITLE']) if args.verbose and not args.quiet: param.print_info() # Set output to multiman style if not args.output: args.output = f'{game_title} [{param["TITLE_ID"]}].iso' except (UnicodeDecodeError, KeyError, IndexError, ValueError): core.warning('Failed reading SFO', args) self.disc_key = self.get_key_from_args(game_title, args) if args.verbose and not args.quiet: self.print_info() def decrypt(self, args): """Decrypt self using args from argparse.""" self._process_iso(args, encrypt_mode=False) def encrypt(self, args): """Encrypt self using args from argparse.""" self._process_iso(args, encrypt_mode=True) def _process_iso(self, args, encrypt_mode): """Shared driver for decrypt/encrypt. ``encrypt_mode`` selects direction.""" core.vprint(f'{"Re-encrypting" if encrypt_mode else "Decrypting"} with disc key: {self.disc_key.hex()}', args) num_workers = args.threads if args.threads and args.threads > 0 else (os.cpu_count() or 1) if args.output: output_name = args.output else: output_name = f'{self.game_id}_e.iso' if encrypt_mode else f'{self.game_id}.iso' core.vprint(f'{"Re-encrypted" if encrypt_mode else "Decrypted"} .iso is output to: {output_name}', args) total_sectors = self.size // core.SECTOR pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True) if num_workers > 1: core.vprint(f'Using {num_workers} processes for parallel {"re-encryption" if encrypt_mode else "decryption"}', args) self._process_parallel(args, output_name, encrypt_mode, num_workers, pbar) else: self._process_sequential(args, output_name, encrypt_mode, pbar) pbar.close() core.vprint(f'{"Re-encryption" if encrypt_mode else "Decryption"} complete!', args) def _process_sequential(self, args, output_name, encrypt_mode, pbar): """Single-process path: read, optionally (de/en)crypt, and write per sector.""" with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_file: for region in self.regions: region_sectors = (region['end'] - region['start']) // core.SECTOR base_sector = region['start'] // core.SECTOR input_iso.seek(region['start']) for i in range(region_sectors): data = input_iso.read(core.SECTOR) if not data: core.warning('Trying to read past the end of the file', args) break if region['enc']: cipher = AES.new(self.disc_key, AES.MODE_CBC, _make_iv(base_sector + i)) data = cipher.encrypt(data) if encrypt_mode else cipher.decrypt(data) output_file.write(data) pbar.update(1) def _process_parallel(self, args, output_name, encrypt_mode, num_workers, pbar): """Multi-process path: one process pool for the whole operation. The output file is pre-sized so every region can be written to its absolute offset; this also clears any stale file from a previous run. """ # Pre-create the output at its final size and write to absolute offsets. total_bytes = self.regions[-1]['end'] with open(output_name, 'wb') as f: f.truncate(total_bytes) with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor, \ open(args.iso, 'rb') as input_iso, \ open(output_name, 'r+b') as out: for region in self.regions: if region['enc']: self._process_region_parallel(input_iso, out, region, executor, num_workers, encrypt_mode, pbar) else: # Unencrypted region — copy straight through to its offset. region_sectors = (region['end'] - region['start']) // core.SECTOR input_iso.seek(region['start']) out.seek(region['start']) for _ in range(region_sectors): data = input_iso.read(core.SECTOR) if not data: core.warning('Trying to read past the end of the file', args) break out.write(data) pbar.update(1) def _process_region_parallel(self, input_iso, out, region, executor, num_workers, encrypt_mode, pbar): """(De/en)crypt one encrypted region across worker processes. Reads the region in bounded-size chunks and keeps only a small window of chunks in flight, so peak memory stays bounded even for huge ISOs. Results are written back to their absolute offsets as they complete. """ num_sectors = (region['end'] - region['start']) // core.SECTOR base_sector = region['start'] // core.SECTOR # Sectors per task: large enough to amortise IPC, small enough to keep # every worker busy and bound peak memory. sectors_per_chunk = max(1, min(512, (num_sectors // (num_workers * 4)) or 1)) max_in_flight = num_workers * 4 input_iso.seek(region['start']) next_sector = base_sector sectors_left = num_sectors pending = deque() def submit_more(): nonlocal next_sector, sectors_left while sectors_left > 0 and len(pending) < max_in_flight: count = min(sectors_per_chunk, sectors_left) blob = input_iso.read(count * core.SECTOR) future = executor.submit(_process_sector_chunk_mp, (self.disc_key, blob, next_sector, encrypt_mode)) pending.append((next_sector, count, future)) next_sector += count sectors_left -= count submit_more() while pending: start_sector, count, future = pending.popleft() out.seek(start_sector * core.SECTOR) out.write(future.result()) pbar.update(count) submit_more() def get_key_from_args(self, game_title, args): # key provided with -d / --decryption-key if args.decryption_key: return core.to_bytes(args.decryption_key) def get_key_from_ird(i): self.ird = ird.IRD(i) if self.ird.region_count != len(self.regions): core.error( f'Corrupt ISO or error in IRD. Expected {self.ird.region_count} regions, found {len(self.regions)} regions') if self.regions[-1]['start'] > self.size: core.error( f'Corrupt ISO or error in IRD. Expected filesize larger than {self.regions[-1]["start"]/1024**3:.2f} GiB, actual size is {self.size/1024**3:.2f} GiB') cipher = AES.new(core.ISO_SECRET, AES.MODE_CBC, core.ISO_IV) return cipher.encrypt(self.ird.data1) # .ird file given with -k / --ird if args.ird: return get_key_from_ird(args.ird) # No key or .ird specified. Let's first check if keys.db is packaged with this release core.vprint('Checking for bundled redump keys', args) db_path = resources.files(__name__).joinpath('data', 'keys.db') db = sqlite3.connect(str(db_path)) c = db.cursor() # UPDATE: 2024 - New database now has game/title ids. See if we have that. core.vprint('Searching using TITLE_ID', args) try: keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-', '')]).fetchall() except sqlite3.OperationalError: core.error('keys.db not found or invalid. Build it with: python3 tools/keys2db.py') if len(keys) == 1: core.vprint(f'Found potential redump key: "{keys[0][0]}"', args) return keys[0][1] # Then check if there's only one game with this exact size core.vprint('Trying to find redump key based on size', args) keys = c.execute('SELECT name, key FROM games WHERE size = ?', [str(self.size)]).fetchall() if len(keys) == 1: core.vprint(f'Found potential redump key: "{keys[0][0]}"', args) return keys[0][1] # If not, see if we can filter it out based on name and size core.vprint('Trying to find redump key based on size, game title, and country', args) if not game_title: core.error('Could not determine game title from PARAM.SFO. Specify a decryption key with -d or provide an IRD file with -k.') try: country = core.serial_country(self.game_id) except ValueError: core.error(f'Unknown country code in game ID "{self.game_id}". Specify a decryption key with -d or provide an IRD file with -k.') keys = c.execute('SELECT name, key FROM games WHERE lower(name) LIKE ? AND size = ?', [ '%' + '%'.join(game_title.lower().split(' ')) + '%' + country.lower() + '%', str(self.size)]).fetchall() if keys: core.vprint(f'Found potential redump key: "{keys[0][0]}"', args) return keys[0][1] # since checksums can take a while to calculate, bail here unless the # user has specifically indicated they want to try the CRC32 fallback if not args.checksum: core.error('could not find disc key') # Okay, searching has failed us, but maaaybe the checksum works? core.vprint('Trying to find redump key based on CRC32', args) cancel = threading.Event() crc_done = threading.Event() if args.checksum_timeout > 0: def timeout(): # Abort the CRC32 calculation if it hasn't finished in time. if not crc_done.wait(timeout=float(args.checksum_timeout)): core.vprint(f'could not calculate CRC32 before {args.checksum_timeout}-second timeout', args) cancel.set() crc_thread = Thread(target=timeout, daemon=True) crc_thread.start() calculated_crc = core.crc32(args.iso, cancel) crc_done.set() if calculated_crc is None: raise TimeoutError keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [calculated_crc.lower()]).fetchall() if len(keys) == 1: core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={calculated_crc.lower()})', args) return keys[0][1] core.error('could not find disc key') def print_info(self): # TODO: This could probably have been a __str__? Who cares? """Print some info about the ISO.""" print(f'Game ID: {self.game_id}') print(f'Key: {self.disc_key.hex()}') print(f'Info from ISO:') print(f'Unencrypted regions: {self.number_of_unencrypted_regions}') for i, region in enumerate(self.regions): print(i, region, region['start'] // core.SECTOR, region['end'] // core.SECTOR)