LibRay/libray/iso.py

421 lines
16 KiB
Python

# -*- coding: utf8 -*-
# libray - Libre Blu-Ray PS3 ISO Tool
# Copyright © 2018 - 2024 Nichlas Severinsen
#
# This file is part of libray.
#
# libray is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# libray is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with libray. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import sqlite3
import concurrent.futures
import threading
from collections import deque
from threading import Thread
from importlib import resources
from tqdm import tqdm
from Crypto.Cipher import AES
try:
from libray import core
from libray import ird
from libray import sfo
except ImportError:
import core
import ird
import sfo
def _make_iv(sector_number):
"""Build a 16-byte IV from a sector number (little-endian)."""
iv = bytearray(16)
num = sector_number
for j in range(16):
iv[15 - j] = num & 0xFF
num >>= 8
return bytes(iv)
def _process_sector_chunk_mp(args):
"""Multiprocessing worker: (de/en)crypt a contiguous run of sectors.
Picklable, module-level function for use with ProcessPoolExecutor. Takes a
single ``bytes`` blob of one or more whole sectors and returns the processed
blob, so only one (large) object is pickled per task instead of many small
ones — this is what makes the multiprocessing worthwhile.
"""
disc_key, sector_blob, sector_start_number, encrypt_mode = args
out = bytearray()
for offset in range(0, len(sector_blob), core.SECTOR):
sector = sector_blob[offset:offset + core.SECTOR]
iv = _make_iv(sector_start_number + offset // core.SECTOR)
cipher = AES.new(disc_key, AES.MODE_CBC, iv)
out += cipher.encrypt(sector) if encrypt_mode else cipher.decrypt(sector)
return bytes(out)
class ISO:
"""Class for handling PS3 .iso files.
Attributes:
size: Size of .iso in bytes
number_of_regions: Number of regions in the .iso
regions: List with info of every region
game_id: PS3 game id
ird: IRD object (see ird.py)
disc_key: data1 from .ird, encrypted
"""
NUM_INFO_BYTES = 4
def read_regions(self, input_iso):
"""List with info dict (start, end, whether it's encrypted) for every region.
Basically, every other (odd numbered) region is encrypted.
"""
# The first region is always unencrypted
encrypted = False
regions = [{
'start': core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR, # Should always be 0?
'end': core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR + core.SECTOR,
'enc': encrypted
}]
# We'll read 4 bytes until we hit a non-size (<=0)
while True:
encrypted = not encrypted
end = core.to_int(input_iso.read(self.NUM_INFO_BYTES)) * core.SECTOR
if not end:
break
regions.append({
'start': regions[-1]['end'],
'end': end + core.SECTOR - (core.SECTOR if encrypted else 0),
'enc': encrypted
})
return regions
def __init__(self, args):
"""ISO constructor using args from argparse."""
self.size = core.size(args.iso)
if not self.size:
core.error('looks like ISO file/mount is empty?')
with open(args.iso, 'rb') as input_iso:
# Get number of unencrypted regions
self.number_of_unencrypted_regions = core.to_int(input_iso.read(self.NUM_INFO_BYTES))
# Skip unused bytes
input_iso.seek(input_iso.tell() + self.NUM_INFO_BYTES)
self.regions = self.read_regions(input_iso)
# Seek to the start of sector 2, '+ 16' skips a section containing some 'playstation'
input_iso.seek(core.SECTOR + 16)
self.game_id = input_iso.read(16).decode('utf8').strip()
# Find PARAM.SFO
core.vprint('Searching for PARAM.SFO', args)
input_iso.seek(0)
counter = 1
found_param = False
while True:
data = input_iso.read(8)
if not data:
break
# if data == b'PS3LICDA':
# print(data)
if data[0:4] == b'\x00\x50\x53\x46':
found_param = True
# input_iso.seek(input_iso.tell() - 8)
# param = sfo.SFO(input_iso)
# print(param['TITLE'])
# print(param['TITLE_ID'])
break
input_iso.seek((core.SECTOR * counter))
counter += 1
game_title = ''
if found_param:
input_iso.seek(input_iso.tell() - 8)
try:
param = sfo.SFO(input_iso)
core.vprint('PARAM.SFO found', args)
game_title = core.multiman_title(param['TITLE'])
if args.verbose and not args.quiet:
param.print_info()
# Set output to multiman style
if not args.output:
args.output = f'{game_title} [{param["TITLE_ID"]}].iso'
except (UnicodeDecodeError, KeyError, IndexError, ValueError):
core.warning('Failed reading SFO', args)
self.disc_key = self.get_key_from_args(game_title, args)
if args.verbose and not args.quiet:
self.print_info()
def decrypt(self, args):
"""Decrypt self using args from argparse."""
self._process_iso(args, encrypt_mode=False)
def encrypt(self, args):
"""Encrypt self using args from argparse."""
self._process_iso(args, encrypt_mode=True)
def _process_iso(self, args, encrypt_mode):
"""Shared driver for decrypt/encrypt. ``encrypt_mode`` selects direction."""
core.vprint(f'{"Re-encrypting" if encrypt_mode else "Decrypting"} with disc key: {self.disc_key.hex()}', args)
num_workers = args.threads if args.threads and args.threads > 0 else (os.cpu_count() or 1)
if args.output:
output_name = args.output
else:
output_name = f'{self.game_id}_e.iso' if encrypt_mode else f'{self.game_id}.iso'
core.vprint(f'{"Re-encrypted" if encrypt_mode else "Decrypted"} .iso is output to: {output_name}', args)
total_sectors = self.size // core.SECTOR
pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True)
if num_workers > 1:
core.vprint(f'Using {num_workers} processes for parallel {"re-encryption" if encrypt_mode else "decryption"}', args)
self._process_parallel(args, output_name, encrypt_mode, num_workers, pbar)
else:
self._process_sequential(args, output_name, encrypt_mode, pbar)
pbar.close()
core.vprint(f'{"Re-encryption" if encrypt_mode else "Decryption"} complete!', args)
def _process_sequential(self, args, output_name, encrypt_mode, pbar):
"""Single-process path: read, optionally (de/en)crypt, and write per sector."""
with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_file:
for region in self.regions:
region_sectors = (region['end'] - region['start']) // core.SECTOR
base_sector = region['start'] // core.SECTOR
input_iso.seek(region['start'])
for i in range(region_sectors):
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
if region['enc']:
cipher = AES.new(self.disc_key, AES.MODE_CBC, _make_iv(base_sector + i))
data = cipher.encrypt(data) if encrypt_mode else cipher.decrypt(data)
output_file.write(data)
pbar.update(1)
def _process_parallel(self, args, output_name, encrypt_mode, num_workers, pbar):
"""Multi-process path: one process pool for the whole operation.
The output file is pre-sized so every region can be written to its
absolute offset; this also clears any stale file from a previous run.
"""
# Pre-create the output at its final size and write to absolute offsets.
total_bytes = self.regions[-1]['end']
with open(output_name, 'wb') as f:
f.truncate(total_bytes)
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor, \
open(args.iso, 'rb') as input_iso, \
open(output_name, 'r+b') as out:
for region in self.regions:
if region['enc']:
self._process_region_parallel(input_iso, out, region, executor, num_workers, encrypt_mode, pbar)
else:
# Unencrypted region — copy straight through to its offset.
region_sectors = (region['end'] - region['start']) // core.SECTOR
input_iso.seek(region['start'])
out.seek(region['start'])
for _ in range(region_sectors):
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
out.write(data)
pbar.update(1)
def _process_region_parallel(self, input_iso, out, region, executor, num_workers, encrypt_mode, pbar):
"""(De/en)crypt one encrypted region across worker processes.
Reads the region in bounded-size chunks and keeps only a small window of
chunks in flight, so peak memory stays bounded even for huge ISOs.
Results are written back to their absolute offsets as they complete.
"""
num_sectors = (region['end'] - region['start']) // core.SECTOR
base_sector = region['start'] // core.SECTOR
# Sectors per task: large enough to amortise IPC, small enough to keep
# every worker busy and bound peak memory.
sectors_per_chunk = max(1, min(512, (num_sectors // (num_workers * 4)) or 1))
max_in_flight = num_workers * 4
input_iso.seek(region['start'])
next_sector = base_sector
sectors_left = num_sectors
pending = deque()
def submit_more():
nonlocal next_sector, sectors_left
while sectors_left > 0 and len(pending) < max_in_flight:
count = min(sectors_per_chunk, sectors_left)
blob = input_iso.read(count * core.SECTOR)
future = executor.submit(_process_sector_chunk_mp,
(self.disc_key, blob, next_sector, encrypt_mode))
pending.append((next_sector, count, future))
next_sector += count
sectors_left -= count
submit_more()
while pending:
start_sector, count, future = pending.popleft()
out.seek(start_sector * core.SECTOR)
out.write(future.result())
pbar.update(count)
submit_more()
def get_key_from_args(self, game_title, args):
# key provided with -d / --decryption-key
if args.decryption_key:
return core.to_bytes(args.decryption_key)
def get_key_from_ird(i):
self.ird = ird.IRD(i)
if self.ird.region_count != len(self.regions):
core.error(
f'Corrupt ISO or error in IRD. Expected {self.ird.region_count} regions, found {len(self.regions)} regions')
if self.regions[-1]['start'] > self.size:
core.error(
f'Corrupt ISO or error in IRD. Expected filesize larger than {self.regions[-1]["start"]/1024**3:.2f} GiB, actual size is {self.size/1024**3:.2f} GiB')
cipher = AES.new(core.ISO_SECRET, AES.MODE_CBC, core.ISO_IV)
return cipher.encrypt(self.ird.data1)
# .ird file given with -k / --ird
if args.ird:
return get_key_from_ird(args.ird)
# No key or .ird specified. Let's first check if keys.db is packaged with this release
core.vprint('Checking for bundled redump keys', args)
db_path = resources.files(__name__).joinpath('data', 'keys.db')
db = sqlite3.connect(str(db_path))
c = db.cursor()
# UPDATE: 2024 - New database now has game/title ids. See if we have that.
core.vprint('Searching using TITLE_ID', args)
try:
keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-', '')]).fetchall()
except sqlite3.OperationalError:
core.error('keys.db not found or invalid. Build it with: python3 tools/keys2db.py')
if len(keys) == 1:
core.vprint(f'Found potential redump key: "{keys[0][0]}"', args)
return keys[0][1]
# Then check if there's only one game with this exact size
core.vprint('Trying to find redump key based on size', args)
keys = c.execute('SELECT name, key FROM games WHERE size = ?', [str(self.size)]).fetchall()
if len(keys) == 1:
core.vprint(f'Found potential redump key: "{keys[0][0]}"', args)
return keys[0][1]
# If not, see if we can filter it out based on name and size
core.vprint('Trying to find redump key based on size, game title, and country', args)
if not game_title:
core.error('Could not determine game title from PARAM.SFO. Specify a decryption key with -d or provide an IRD file with -k.')
try:
country = core.serial_country(self.game_id)
except ValueError:
core.error(f'Unknown country code in game ID "{self.game_id}". Specify a decryption key with -d or provide an IRD file with -k.')
keys = c.execute('SELECT name, key FROM games WHERE lower(name) LIKE ? AND size = ?', [
'%' + '%'.join(game_title.lower().split(' ')) + '%' + country.lower() + '%', str(self.size)]).fetchall()
if keys:
core.vprint(f'Found potential redump key: "{keys[0][0]}"', args)
return keys[0][1]
# since checksums can take a while to calculate, bail here unless the
# user has specifically indicated they want to try the CRC32 fallback
if not args.checksum:
core.error('could not find disc key')
# Okay, searching has failed us, but maaaybe the checksum works?
core.vprint('Trying to find redump key based on CRC32', args)
cancel = threading.Event()
crc_done = threading.Event()
if args.checksum_timeout > 0:
def timeout():
# Abort the CRC32 calculation if it hasn't finished in time.
if not crc_done.wait(timeout=float(args.checksum_timeout)):
core.vprint(f'could not calculate CRC32 before {args.checksum_timeout}-second timeout', args)
cancel.set()
crc_thread = Thread(target=timeout, daemon=True)
crc_thread.start()
calculated_crc = core.crc32(args.iso, cancel)
crc_done.set()
if calculated_crc is None:
raise TimeoutError
keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [calculated_crc.lower()]).fetchall()
if len(keys) == 1:
core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={calculated_crc.lower()})', args)
return keys[0][1]
core.error('could not find disc key')
def print_info(self):
# TODO: This could probably have been a __str__? Who cares?
"""Print some info about the ISO."""
print(f'Game ID: {self.game_id}')
print(f'Key: {self.disc_key.hex()}')
print(f'Info from ISO:')
print(f'Unencrypted regions: {self.number_of_unencrypted_regions}')
for i, region in enumerate(self.regions):
print(i, region, region['start'] // core.SECTOR, region['end'] // core.SECTOR)