From 35e49044f1692961bf651c917f66523faa208e6e Mon Sep 17 00:00:00 2001 From: Oracle Date: Tue, 19 May 2026 17:41:17 +0200 Subject: [PATCH 1/2] Initial parallelization attempt --- AGENTS.md | 35 ++++++ libray/__init__.py | 6 +- libray/core.py | 53 +++++---- libray/iso.py | 278 +++++++++++++++++++++++++++++---------------- libray/libray | 30 ++++- libray/libray.py | 30 ++++- libray/py.typed | 0 requirements.txt | 2 - setup.py | 8 +- tests/test_iso.py | 22 ++-- 10 files changed, 321 insertions(+), 143 deletions(-) create mode 100644 AGENTS.md create mode 100644 libray/py.typed diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..112b803 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,35 @@ +# LibRay — Agent Quick Reference +# Important +Make sure to use the virtual environment in .venv and not global pip. + +## Repo +- Python 3 CLI tool for decrypting/encrypting/examining PS3 Blu-Ray ISOs +- Entry point: `libray/libray` (also `libray/libray.py`, identical copy) +- Package: `libray/` — modules: `core.py` (main logic), `iso.py` (ISO parsing), `ird.py` (IRD parsing), `sfo.py` (PARAM.SFO) +- Tests: `tests/` — `test_iso.py`, `test_interface.py` (interface test is currently skipped/broken) +- Tools: `tools/keys2db.py` (builds `libray/data/keys.db` from redump keys), `tools/rpcs3.py` (fetches compat data) + +## Commands +- Install deps: `pip install -r requirements.txt` +- Run tests: `python -m unittest discover -b` +- Build sdist + wheel: `python3 setup.py sdist bdist_wheel` +- Publish: `twine upload dist/*` + +## Parallelization +- Decrypt and re-encrypt support multi-threading via `-p`/`--threads` CLI argument +- Default: auto-detects CPU core count via `os.cpu_count()` +- Each sector is independently decrypted (per-sector IV in AES-CBC), making it embarrassingly parallel +- Uses `concurrent.futures.ThreadPoolExecutor` (threads, not processes, since pycryptodome releases the GIL) +- Unencrypted regions are always copied sequentially (no crypto needed) +- Sector data is read upfront into memory, then processed in parallel, then written in order + +## Gotchas +- **Crypto package conflict**: `pycrypto`/`crypto` will break `pycryptodome`. If `ImportError: No module named Crypto.Cipher`, run: + ``` + pip uninstall crypto pycrypto && pip install pycryptodome + ``` +- **keys.db is generated**, not committed. Build it with `python3 tools/keys2db.py` (requires keys in `tools/keys/`). It's listed in `.gitignore` via `libray/data/*.db`. +- **`libray/__init__.py`** dynamically imports all submodules via `pkgutil.walk_packages` — don't expect explicit imports. +- **`test_interface.py`** is skipped (`@unittest.skip('currently broken')`) — the interface test won't run. +- `.editorconfig` enforces 4-space indent for `.py`, 2-space for `.yml`/`.yaml`. +- No linting/typechecking config exists — plain `unittest`, no pytest, no pre-commit. diff --git a/libray/__init__.py b/libray/__init__.py index 305c8e6..d6881a2 100644 --- a/libray/__init__.py +++ b/libray/__init__.py @@ -24,12 +24,10 @@ # This script is not included in the release of libray. import pkgutil +import importlib __all__ = [] for loader, module_name, is_pkg in pkgutil.walk_packages(__path__): __all__.append(module_name) - try: - _module = loader.find_module(module_name).load_module(module_name) - except AttributeError: - _module = loader.find_spec(module_name).loader.load_module(module_name) + _module = importlib.import_module(f'.{module_name}', __name__) globals()[module_name] = _module diff --git a/libray/core.py b/libray/core.py index 991b9de..741035d 100644 --- a/libray/core.py +++ b/libray/core.py @@ -26,6 +26,7 @@ import zlib import shutil import requests from bs4 import BeautifulSoup +from typing import Any, Optional try: @@ -44,23 +45,25 @@ GET_IRD_NET_LOC = 'http://jonnysp.bplaced.net/ird/' # Utility functions -def to_int(data, byteorder='big'): +def to_int(data: bytes, byteorder: str = 'big') -> int: """Convert bytes to integer""" if isinstance(data, bytes): return int.from_bytes(data, byteorder) + return 0 -def to_bytes(data): +def to_bytes(data: str) -> Optional[bytes]: """Convert a string of HEX to bytes""" if isinstance(data, str): return bytes(bytearray.fromhex(data)) + return None ISO_SECRET = to_bytes("380bcf0b53455b3c7817ab4fa3ba90ed") ISO_IV = to_bytes("69474772af6fdab342743aefaa186287") -def size(path): +def size(path: str) -> int: """Get size of a file or block device in bytes""" pathstat = os.stat(path) @@ -74,7 +77,7 @@ def size(path): return pathstat.st_size -def read_seven_bit_encoded_int(fileobj, order): +def read_seven_bit_encoded_int(fileobj, order: str) -> int: """Read an Int32, 7 bits at a time.""" # The highest bit of the byte, when on, means to continue reading more bytes. count = 0 @@ -90,27 +93,27 @@ def read_seven_bit_encoded_int(fileobj, order): return count -def error(msg): +def error(msg: str) -> None: """Print fatal error message and terminate""" - print('[ERROR] %s' % msg, file=sys.stderr) + print(f'[ERROR] {msg}', file=sys.stderr) sys.exit(1) -def warning(msg, args): +def warning(msg: str, args) -> None: """Print a warning message. Warning messages can be silenced with --quiet""" if not args.quiet: - print('[WARNING] %s. Continuing regardless.' % msg, file=sys.stderr) + print(f'[WARNING] {msg}. Continuing regardless.', file=sys.stderr) -def vprint(msg, args): +def vprint(msg: str, args) -> None: """Vprint, verbose print, can be silenced with --quiet""" if not args.quiet: - print('[*] ' + msg) + print(f'[*] {msg}') -def download_ird(ird_name): +def download_ird(ird_name: str) -> None: """Download an .ird from GET_IRD_NET_LOC""" # Check if file already exists and skip if it does @@ -118,14 +121,14 @@ def download_ird(ird_name): return ird_link = GET_IRD_NET_LOC + ird_name - r = requests.get(ird_link, stream=True) + r = requests.get(ird_link, stream=True, timeout=30) with open(ird_name, 'wb') as ird_file: r.raw.decode_content = True shutil.copyfileobj(r.raw, ird_file) -def ird_by_game_id(game_id): +def ird_by_game_id(game_id: str) -> Optional[str]: """Using a game_id, download the responding .ird from ALL_IRD_NET_LOC""" gameid = game_id.replace('-', '') try: @@ -148,26 +151,28 @@ def ird_by_game_id(game_id): return (ird_name) -def crc32(filename, keep_going=[True]): +def crc32(filename: str, keep_going: Optional[list] = None) -> Optional[str]: """Calculate crc32 for file""" + if keep_going is None: + keep_going = [True] with open(filename, 'rb') as infile: - crc32 = 0 + crc_val = 0 - while keep_going[0] == True: + while keep_going[0] is True: data = infile.read(65536) if not data: break - crc32 = zlib.crc32(data, crc32) + crc_val = zlib.crc32(data, crc_val) - if keep_going[0] == False: + if keep_going[0] is False: return None - return "%08X" % (crc32 & 0xFFFFFFFF) + return f"{crc_val & 0xFFFFFFFF:08X}" -def serial_country(title): +def serial_country(title: str) -> str: """Get country from disc serial / productcode / title_id""" if title[2] == 'A': @@ -188,7 +193,7 @@ def serial_country(title): raise ValueError('Unknown country?!') -def multiman_title(title): +def multiman_title(title: str) -> str: """Fix special characters in title for Multiman style""" replace = { @@ -206,7 +211,7 @@ def multiman_title(title): # Main functions -def info(args): +def info(args: Any) -> None: """Print information about .iso and then quit.""" if args.iso: @@ -220,7 +225,7 @@ def info(args): sys.exit() -def decrypt(args): +def decrypt(args: Any) -> None: """Try to decrypt a given .iso using relevant .ird or encryption key from argparse If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso @@ -232,7 +237,7 @@ def decrypt(args): input_iso.decrypt(args) -def encrypt(args): +def encrypt(args: Any) -> None: """Try to re-encrypt a decrypted .iso using relevant .ird or encryption key from argparse If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso diff --git a/libray/iso.py b/libray/iso.py index 2fcc04b..df5ce4a 100644 --- a/libray/iso.py +++ b/libray/iso.py @@ -23,9 +23,12 @@ import os import sys import sqlite3 import pathlib -from threading import Thread +import concurrent.futures +import threading import time -import pkg_resources +import queue +from threading import Thread +from importlib import resources from tqdm import tqdm from Crypto.Cipher import AES @@ -40,6 +43,28 @@ except ImportError: import sfo +def _decrypt_sector_worker(disc_key, sector_data, sector_number): + """Standalone worker for parallel sector decryption.""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) + return (sector_number, cipher.decrypt(sector_data)) + + +def _encrypt_sector_worker(disc_key, sector_data, sector_number): + """Standalone worker for parallel sector encryption.""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) + return (sector_number, cipher.encrypt(sector_data)) + + class ISO: """Class for handling PS3 .iso files. @@ -166,127 +191,182 @@ class ISO: if args.verbose and not args.quiet: self.print_info() + def _make_iv(self, sector_number): + """Build a 16-byte IV from a sector number (little-endian).""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + return bytes(iv) + + def _process_region_pipeline(self, input_path, region, num_workers, encrypt_mode, args): + """Process an encrypted region using a reader-worker pipeline. + + A reader thread reads sectors from the file and puts them on a queue. + Worker threads pull from the queue, process each sector in parallel, + and store results. This overlaps I/O with processing for better CPU usage. + """ + num_sectors = (region['end'] - region['start']) // core.SECTOR + queue_size = min(64, num_sectors) + sector_queue = queue.Queue(maxsize=queue_size) + results = [None] * num_sectors + results_lock = threading.Lock() + + def reader(): + with open(input_path, 'rb') as f: + f.seek(region['start']) + for i in range(num_sectors): + sector_data = f.read(core.SECTOR) + sector_queue.put((i, sector_data)) + for _ in range(num_workers): + sector_queue.put(None) + + def worker(): + while True: + item = sector_queue.get() + if item is None: + sector_queue.task_done() + break + idx, sector_data = item + _, processed = (_encrypt_sector_worker if encrypt_mode else _decrypt_sector_worker)( + self.disc_key, sector_data, region['start'] // core.SECTOR + idx + ) + with results_lock: + results[idx] = processed + sector_queue.task_done() + + reader_thread = threading.Thread(target=reader, daemon=True) + reader_thread.start() + + workers = [] + for _ in range(num_workers): + t = threading.Thread(target=worker, daemon=True) + t.start() + workers.append(t) + + for t in workers: + t.join() + + reader_thread.join() + + return b''.join(results) + def decrypt(self, args): """Decrypt self using args from argparse.""" core.vprint(f'Decrypting with disc key: {self.disc_key.hex()}', args) - with open(args.iso, 'rb') as input_iso: + num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 + if num_workers > 1: + core.vprint(f'Using {num_workers} threads for parallel decryption', args) - if not args.output: - output_name = f'{self.game_id}.iso' - else: - output_name = args.output + if not args.output: + output_name = f'{self.game_id}.iso' + else: + output_name = args.output - core.vprint(f'Decrypted .iso is output to: {output_name}', args) + core.vprint(f'Decrypted .iso is output to: {output_name}', args) - with open(output_name, 'wb') as output_iso: + total_sectors = self.size // core.SECTOR - if not args.quiet: - pbar = tqdm(total=(self.size // 2048)) + with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: - for region in self.regions: + pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True) + + for region in self.regions: + region_sectors = (region['end'] - region['start']) // core.SECTOR + + if not region['enc']: + # Unencrypted region — copy sequentially input_iso.seek(region['start']) - - # Unencrypted region, just copy it - if not region['enc']: - while input_iso.tell() < region['end']: - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break - output_iso.write(data) - - if not args.quiet: - pbar.update(1) - continue - # Encrypted region, decrypt then write + for _ in range(region_sectors): + data = input_iso.read(core.SECTOR) + if not data: + core.warning('Trying to read past the end of the file', args) + break + output_iso.write(data) + pbar.update(1) + else: + # Encrypted region — pipeline: reader thread + worker threads + if num_workers > 1: + processed = self._process_region_pipeline( + args.iso, region, num_workers, encrypt_mode=False, args=args + ) else: - while input_iso.tell() < region['end']: - num = input_iso.tell() // 2048 - iv = bytearray([0 for i in range(0, 16)]) - for j in range(0, 16): - iv[16 - j - 1] = (num & 0xFF) - num >>= 8 + # Sequential fallback + input_iso.seek(region['start']) + processed = bytearray() + for i in range(region_sectors): + sector_num = region['start'] // core.SECTOR + i + iv = self._make_iv(sector_num) + cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) + processed.extend(cipher.decrypt(input_iso.read(core.SECTOR))) + processed = bytes(processed) - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break + output_iso.write(processed) + pbar.update(region_sectors) - cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv)) - decrypted = cipher.decrypt(data) - - output_iso.write(decrypted) - - if not args.quiet: - pbar.update(1) - - if not args.quiet: - pbar.close() - - core.vprint('Decryption complete!', args) + pbar.close() + core.vprint('Decryption complete!', args) def encrypt(self, args): """Encrypt self using args from argparse.""" core.vprint(f'Re-encrypting with disc key: {self.disc_key.hex()}', args) - with open(args.iso, 'rb') as input_iso: + num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 + if num_workers > 1: + core.vprint(f'Using {num_workers} threads for parallel re-encryption', args) - if not args.output: - output_name = f'{self.game_id}_e.iso' - else: - output_name = args.output + if not args.output: + output_name = f'{self.game_id}_e.iso' + else: + output_name = args.output - core.vprint(f'Re-encrypted .iso is output to: {output_name}', args) + core.vprint(f'Re-encrypted .iso is output to: {output_name}', args) - with open(output_name, 'wb') as output_iso: + with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: - if not args.quiet: - pbar = tqdm(total=(self.size // 2048)) + pbar = tqdm(total=(self.size // 2048), file=sys.stdout, disable=args.quiet, leave=True) - for region in self.regions: + for region in self.regions: + region_sectors = (region['end'] - region['start']) // core.SECTOR + + if not region['enc']: + # Unencrypted region — copy sequentially input_iso.seek(region['start']) - - # Unencrypted region, just copy it - if not region['enc']: - while input_iso.tell() < region['end']: - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break - output_iso.write(data) - - if not args.quiet: - pbar.update(1) - continue - # Decrypted region, re-encrypt it + for _ in range(region_sectors): + data = input_iso.read(core.SECTOR) + if not data: + core.warning('Trying to read past the end of the file', args) + break + output_iso.write(data) + pbar.update(1) + else: + # Encrypted region — pipeline: reader thread + worker threads + if num_workers > 1: + processed = self._process_region_pipeline( + args.iso, region, num_workers, encrypt_mode=True, args=args + ) else: - while input_iso.tell() < region['end']: - num = input_iso.tell() // 2048 - iv = bytearray([0 for i in range(0, 16)]) - for j in range(0, 16): - iv[16 - j - 1] = (num & 0xFF) - num >>= 8 + # Sequential fallback + input_iso.seek(region['start']) + processed = bytearray() + for i in range(region_sectors): + sector_num = region['start'] // core.SECTOR + i + iv = self._make_iv(sector_num) + cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) + processed.extend(cipher.encrypt(input_iso.read(core.SECTOR))) + processed = bytes(processed) - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break + output_iso.write(processed) + pbar.update(region_sectors) - cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv)) - encrypted = cipher.encrypt(data) + if not args.quiet: + pbar.close() - output_iso.write(encrypted) - - if not args.quiet: - pbar.update(1) - - if not args.quiet: - pbar.close() - - core.vprint('Re-encryption complete!', args) + core.vprint('Re-encryption complete!', args) def get_key_from_args(self, game_title, args): # key provided with -d / --decryption-key @@ -313,8 +393,16 @@ class ISO: core.vprint('Checking for bundled redump keys', args) try: - db = sqlite3.connect(pkg_resources.resource_filename(__name__, 'data/keys.db')) - except FileNotFoundError: + db_path = resources.files(__name__).joinpath('data', 'keys.db') + if hasattr(db_path, 'read_bytes'): + # importlib.resources.abc.Traversable - write to temp file for sqlite3 + import tempfile + with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: + tmp.write(db_path.read_bytes()) + db = sqlite3.connect(tmp.name) + else: + db = sqlite3.connect(str(db_path)) + except (FileNotFoundError, AttributeError): db = sqlite3.connect((pathlib.Path(__file__).resolve() / 'data/') / 'keys.db') c = db.cursor() diff --git a/libray/libray b/libray/libray index aec94df..08cc1a3 100755 --- a/libray/libray +++ b/libray/libray @@ -20,7 +20,29 @@ # along with libray. If not, see . +#!/usr/bin/env python3 +# -*- coding: utf8 -*- + +# libray - Libre Blu-Ray PS3 ISO Tool +# Copyright © 2018 - 2024 Nichlas Severinsen +# +# This file is part of libray. +# +# libray is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# libray is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with libray. If not, see . + import argparse +import sys try: @@ -29,8 +51,7 @@ except ImportError: import core -if __name__ == '__main__': - +def main(): parser = argparse.ArgumentParser( description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs') @@ -47,6 +68,7 @@ if __name__ == '__main__': optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true') optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true') optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15) + optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0) optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.') args = parser.parse_args() @@ -61,3 +83,7 @@ if __name__ == '__main__': core.encrypt(args) else: core.decrypt(args) + + +if __name__ == '__main__': + main() diff --git a/libray/libray.py b/libray/libray.py index aec94df..08cc1a3 100755 --- a/libray/libray.py +++ b/libray/libray.py @@ -20,7 +20,29 @@ # along with libray. If not, see . +#!/usr/bin/env python3 +# -*- coding: utf8 -*- + +# libray - Libre Blu-Ray PS3 ISO Tool +# Copyright © 2018 - 2024 Nichlas Severinsen +# +# This file is part of libray. +# +# libray is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# libray is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with libray. If not, see . + import argparse +import sys try: @@ -29,8 +51,7 @@ except ImportError: import core -if __name__ == '__main__': - +def main(): parser = argparse.ArgumentParser( description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs') @@ -47,6 +68,7 @@ if __name__ == '__main__': optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true') optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true') optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15) + optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0) optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.') args = parser.parse_args() @@ -61,3 +83,7 @@ if __name__ == '__main__': core.encrypt(args) else: core.decrypt(args) + + +if __name__ == '__main__': + main() diff --git a/libray/py.typed b/libray/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index 3e249d6..680254f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,3 @@ tqdm~=4.66.2 pycryptodome~=3.20.0 requests~=2.31.0 beautifulsoup4~=4.12.3 -html5lib~=1.1 -setuptools~=69.1.1 diff --git a/setup.py b/setup.py index 416f5da..30ee147 100755 --- a/setup.py +++ b/setup.py @@ -19,14 +19,16 @@ setup( author_email="ns@nsz.no", url="https://notabug.org/necklace/libray", packages=['libray'], - scripts=['libray/libray'], + entry_points={ + 'console_scripts': [ + 'libray=libray.libray:main', + ], + }, install_requires=[ 'tqdm~=4.66.2', 'pycryptodome~=3.20.0', 'requests~=2.31.0', 'beautifulsoup4~=4.12.3', - 'html5lib~=1.1', - 'setuptools~=69.1.1', ], include_package_data=True, package_data={'': ['data/keys.db']}, diff --git a/tests/test_iso.py b/tests/test_iso.py index 3c7f2c8..25de326 100644 --- a/tests/test_iso.py +++ b/tests/test_iso.py @@ -62,7 +62,7 @@ class TestISO(unittest.TestCase): {'start': 1024, 'end': 2048, 'enc': False} ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: returned_key = fake_iso.get_key_from_args('AAA', mock_args) mock_ird.assert_called_once_with('aaa.ird') self.assertEqual(decryptkey_bytes, returned_key) @@ -84,7 +84,7 @@ class TestISO(unittest.TestCase): {'start': 0, 'end': 512, 'enc': False}, ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -107,7 +107,7 @@ class TestISO(unittest.TestCase): {'start': 2000000000, 'end': 2000001000, 'enc': False} ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -122,7 +122,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]] @@ -140,7 +140,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]] @@ -158,7 +158,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]] @@ -177,7 +177,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -195,12 +195,12 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) fakeresults = ([], [], [['AAA', decryptkey_bytes]]) mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults - with mock.patch('iso.core.crc32', return_value='01010101'): + with mock.patch('libray.iso.core.crc32', return_value='01010101'): returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @@ -217,9 +217,9 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] - with mock.patch('iso.core.crc32', return_value=None): + with mock.patch('libray.iso.core.crc32', return_value=None): with self.assertRaises(TimeoutError): fake_iso.get_key_from_args('AAA', mock_args) -- 2.43.0 From 8ca5371a984466b78eb56eadcaea0c875168c3b5 Mon Sep 17 00:00:00 2001 From: Apunkt Date: Thu, 21 May 2026 09:14:20 +0200 Subject: [PATCH 2/2] fix: parallelization effort, progressbar w/ bit-identitical file write compared to sequential path --- AGENTS.md | 14 +- libray/__init__.py | 10 +- libray/core.py | 23 +-- libray/ird.py | 116 ++++++++------- libray/iso.py | 345 +++++++++++++++++++-------------------------- libray/libray | 90 +----------- libray/libray.py | 25 +--- target.md5 | 1 + tests/test_iso.py | 170 +++++++++++++++++++++- 9 files changed, 407 insertions(+), 387 deletions(-) mode change 100755 => 120000 libray/libray create mode 100644 target.md5 diff --git a/AGENTS.md b/AGENTS.md index 112b803..d4a6357 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,10 +1,10 @@ # LibRay — Agent Quick Reference # Important -Make sure to use the virtual environment in .venv and not global pip. +Make sure to use the virtual environment in `~/.venv/libray` (activate with `source ~/.venv/libray/bin/activate`) and not global pip. ## Repo - Python 3 CLI tool for decrypting/encrypting/examining PS3 Blu-Ray ISOs -- Entry point: `libray/libray` (also `libray/libray.py`, identical copy) +- Entry point: `libray/libray.py` (defines `main()`); installed console script is `libray=libray.libray:main` (see `setup.py`). `libray/libray` is a symlink to `libray.py` for running from a source checkout. - Package: `libray/` — modules: `core.py` (main logic), `iso.py` (ISO parsing), `ird.py` (IRD parsing), `sfo.py` (PARAM.SFO) - Tests: `tests/` — `test_iso.py`, `test_interface.py` (interface test is currently skipped/broken) - Tools: `tools/keys2db.py` (builds `libray/data/keys.db` from redump keys), `tools/rpcs3.py` (fetches compat data) @@ -16,12 +16,14 @@ Make sure to use the virtual environment in .venv and not global pip. - Publish: `twine upload dist/*` ## Parallelization -- Decrypt and re-encrypt support multi-threading via `-p`/`--threads` CLI argument +- Decrypt and re-encrypt support multiprocessing via `-p`/`--threads` CLI argument - Default: auto-detects CPU core count via `os.cpu_count()` - Each sector is independently decrypted (per-sector IV in AES-CBC), making it embarrassingly parallel -- Uses `concurrent.futures.ThreadPoolExecutor` (threads, not processes, since pycryptodome releases the GIL) +- Uses `concurrent.futures.ProcessPoolExecutor` — true multi-core parallelism by spawning OS processes +- Each process gets its own Python interpreter (own GIL), so pycryptodome's partial GIL release is irrelevant - Unencrypted regions are always copied sequentially (no crypto needed) -- Sector data is read upfront into memory, then processed in parallel, then written in order +- Module-level `_process_sector_chunk_mp()` function is picklable for use with ProcessPoolExecutor; it processes a contiguous run of sectors per task to amortise IPC cost +- A bounded window of chunks is kept in flight (memory stays bounded for large ISOs); results are written back to their absolute offsets in the output file as they complete ## Gotchas - **Crypto package conflict**: `pycrypto`/`crypto` will break `pycryptodome`. If `ImportError: No module named Crypto.Cipher`, run: @@ -29,7 +31,7 @@ Make sure to use the virtual environment in .venv and not global pip. pip uninstall crypto pycrypto && pip install pycryptodome ``` - **keys.db is generated**, not committed. Build it with `python3 tools/keys2db.py` (requires keys in `tools/keys/`). It's listed in `.gitignore` via `libray/data/*.db`. -- **`libray/__init__.py`** dynamically imports all submodules via `pkgutil.walk_packages` — don't expect explicit imports. +- **`libray/__init__.py`** dynamically imports submodules via `pkgutil.walk_packages` + `importlib.import_module` (skips the `libray` entry-point module to avoid shadowing the package) — don't expect explicit imports. - **`test_interface.py`** is skipped (`@unittest.skip('currently broken')`) — the interface test won't run. - `.editorconfig` enforces 4-space indent for `.py`, 2-space for `.yml`/`.yaml`. - No linting/typechecking config exists — plain `unittest`, no pytest, no pre-commit. diff --git a/libray/__init__.py b/libray/__init__.py index d6881a2..35a8408 100644 --- a/libray/__init__.py +++ b/libray/__init__.py @@ -18,16 +18,16 @@ # You should have received a copy of the GNU General Public License # along with libray. If not, see . -# This is a script to find the redump names and game serial id's using rpcs3's compatibility list. -# It puts the name, serial id, and some other info into an sqlite3 database. -# That database can then be used to harcode serial id to keys into keys.db. -# This script is not included in the release of libray. - import pkgutil import importlib __all__ = [] for loader, module_name, is_pkg in pkgutil.walk_packages(__path__): + # Skip the entry-point script (libray/libray.py) which would pollute + # the package namespace with a 'libray' submodule that shadows the + # package itself when imported via 'importlib.import_module'. + if module_name == 'libray': + continue __all__.append(module_name) _module = importlib.import_module(f'.{module_name}', __name__) globals()[module_name] = _module diff --git a/libray/core.py b/libray/core.py index 741035d..2b173bc 100644 --- a/libray/core.py +++ b/libray/core.py @@ -24,6 +24,7 @@ import sys import stat import zlib import shutil +import threading import requests from bs4 import BeautifulSoup from typing import Any, Optional @@ -99,14 +100,14 @@ def error(msg: str) -> None: sys.exit(1) -def warning(msg: str, args) -> None: +def warning(msg: str, args: Any) -> None: """Print a warning message. Warning messages can be silenced with --quiet""" if not args.quiet: print(f'[WARNING] {msg}. Continuing regardless.', file=sys.stderr) -def vprint(msg: str, args) -> None: +def vprint(msg: str, args: Any) -> None: """Vprint, verbose print, can be silenced with --quiet""" if not args.quiet: @@ -151,22 +152,24 @@ def ird_by_game_id(game_id: str) -> Optional[str]: return (ird_name) -def crc32(filename: str, keep_going: Optional[list] = None) -> Optional[str]: - """Calculate crc32 for file""" - if keep_going is None: - keep_going = [True] +def crc32(filename: str, cancel: Optional[threading.Event] = None) -> Optional[str]: + """Calculate crc32 for file. + + If *cancel* is provided, the computation can be aborted from another + thread by calling ``cancel.set()``. Returns ``None`` when cancelled. + """ + if cancel is None: + cancel = threading.Event() with open(filename, 'rb') as infile: - crc_val = 0 - - while keep_going[0] is True: + while not cancel.is_set(): data = infile.read(65536) if not data: break crc_val = zlib.crc32(data, crc_val) - if keep_going[0] is False: + if cancel.is_set(): return None return f"{crc_val & 0xFFFFFFFF:08X}" diff --git a/libray/ird.py b/libray/ird.py index 9df1454..6af748c 100644 --- a/libray/ird.py +++ b/libray/ird.py @@ -23,6 +23,7 @@ import os import sys import zlib import shutil +import tempfile try: @@ -47,84 +48,91 @@ class IRD: """ ORDER = 'little' - TEMP_FILE = 'ird' MAGIC_STRING = b'3IRD' def __init__(self, ird_path, verbose=False): """IRD constructor using args from argparse.""" - self.uncompress(ird_path) # TODO: Try/Except? + temp_path = self._prepare_temp(ird_path) - self.size = core.size(self.TEMP_FILE) + try: + self.size = core.size(temp_path) - if not self.size: - core.error('IRD file is empty!') + if not self.size: + core.error('IRD file is empty!') - with open(self.TEMP_FILE, 'rb') as input_ird: - if input_ird.read(4) != self.MAGIC_STRING: - core.error('Either not an IRD file, corruped IRD file, or unknown IRD format') + with open(temp_path, 'rb') as input_ird: + if input_ird.read(4) != self.MAGIC_STRING: + core.error('Either not an IRD file, corruped IRD file, or unknown IRD format') - self.version = core.to_int(input_ird.read(1), self.ORDER) - self.game_id = input_ird.read(9) - name_length = core.read_seven_bit_encoded_int(input_ird, self.ORDER) - self.game_name = input_ird.read(name_length).decode('utf8') - self.update_version = input_ird.read(4) - self.game_version = input_ird.read(5) - self.app_version = input_ird.read(5) + self.version = core.to_int(input_ird.read(1), self.ORDER) + self.game_id = input_ird.read(9) + name_length = core.read_seven_bit_encoded_int(input_ird, self.ORDER) + self.game_name = input_ird.read(name_length).decode('utf8') + self.update_version = input_ird.read(4) + self.game_version = input_ird.read(5) + self.app_version = input_ird.read(5) - if self.version == 7: - self.identifier = input_ird.read(4) + if self.version == 7: + self.identifier = input_ird.read(4) - header_length = (core.to_int(input_ird.read(4), self.ORDER)) - self.header = input_ird.read(header_length) - footer_length = (core.to_int(input_ird.read(4), self.ORDER)) - self.footer = input_ird.read(footer_length) + header_length = (core.to_int(input_ird.read(4), self.ORDER)) + self.header = input_ird.read(header_length) + footer_length = (core.to_int(input_ird.read(4), self.ORDER)) + self.footer = input_ird.read(footer_length) - self.region_count = core.to_int(input_ird.read(1), self.ORDER) - self.region_hashes = [] - for _ in range(0, self.region_count): - self.region_hashes.append(input_ird.read(16)) + self.region_count = core.to_int(input_ird.read(1), self.ORDER) + self.region_hashes = [] + for _ in range(0, self.region_count): + self.region_hashes.append(input_ird.read(16)) - self.file_count = core.to_int(input_ird.read(4), self.ORDER) - self.file_hashes = [] - for _ in range(0, self.file_count): - key = core.to_int(input_ird.read(8), self.ORDER) - val = input_ird.read(16) - self.file_hashes.append({'key': key, 'val': val}) + self.file_count = core.to_int(input_ird.read(4), self.ORDER) + self.file_hashes = [] + for _ in range(0, self.file_count): + key = core.to_int(input_ird.read(8), self.ORDER) + val = input_ird.read(16) + self.file_hashes.append({'key': key, 'val': val}) - if self.version >= 9: - self.pic = input_ird.read(115) + if self.version >= 9: + self.pic = input_ird.read(115) - input_ird.seek(input_ird.tell() + 4) # ? + input_ird.seek(input_ird.tell() + 4) # ? - self.data1 = input_ird.read(16) - self.data2 = input_ird.read(16) + self.data1 = input_ird.read(16) + self.data2 = input_ird.read(16) - if self.version < 9: - self.pic = input_ird.read(115) + if self.version < 9: + self.pic = input_ird.read(115) - if self.version < 7: - self.uid = core.to_int(input_ird.read(4), self.ORDER) + if self.version < 7: + self.uid = core.to_int(input_ird.read(4), self.ORDER) - if verbose: - self.print_info() + if verbose: + self.print_info() + finally: + os.remove(temp_path) - os.remove(self.TEMP_FILE) + def _prepare_temp(self, filename): + """Decompress (if needed) and copy an IRD file to a unique temp file. - def uncompress(self, filename): - """Uncompress IRD. Assumes given .ird file is not compressed, but then tries to decompress it with zlib/gzfile if it was not uncompressed.""" + Returns the path to the temp file. The caller is responsible for + deleting it (use a ``try / finally`` block). + """ + with open(filename, 'rb') as f: + magic = f.read(4) - uncompress = False - with open(filename, 'rb') as input_ird: - if input_ird.read(4) != self.MAGIC_STRING: - uncompress = True + fd, tmp_path = tempfile.mkstemp(suffix='.ird') + os.close(fd) - if uncompress: - with open(filename, 'rb') as gzfile: - with open(self.TEMP_FILE, 'wb') as tmpfile: - tmpfile.write(zlib.decompress(gzfile.read(), zlib.MAX_WBITS | 16)) + if magic != self.MAGIC_STRING: + # Compressed — decompress to the temp file + with open(filename, 'rb') as gzfile, open(tmp_path, 'wb') as tmpfile: + tmpfile.write(zlib.decompress(gzfile.read(), zlib.MAX_WBITS | 16)) else: - shutil.copyfile(filename, self.TEMP_FILE) + # Uncompressed — copy to the temp file + shutil.copyfile(filename, tmp_path) + + return tmp_path def print_info(self, regions=False): # TODO: This could probably have been a __str__? Who cares? diff --git a/libray/iso.py b/libray/iso.py index df5ce4a..690067e 100644 --- a/libray/iso.py +++ b/libray/iso.py @@ -22,11 +22,9 @@ import os import sys import sqlite3 -import pathlib import concurrent.futures import threading -import time -import queue +from collections import deque from threading import Thread from importlib import resources from tqdm import tqdm @@ -43,26 +41,32 @@ except ImportError: import sfo -def _decrypt_sector_worker(disc_key, sector_data, sector_number): - """Standalone worker for parallel sector decryption.""" +def _make_iv(sector_number): + """Build a 16-byte IV from a sector number (little-endian).""" iv = bytearray(16) num = sector_number for j in range(16): iv[15 - j] = num & 0xFF num >>= 8 - cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) - return (sector_number, cipher.decrypt(sector_data)) + return bytes(iv) -def _encrypt_sector_worker(disc_key, sector_data, sector_number): - """Standalone worker for parallel sector encryption.""" - iv = bytearray(16) - num = sector_number - for j in range(16): - iv[15 - j] = num & 0xFF - num >>= 8 - cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) - return (sector_number, cipher.encrypt(sector_data)) +def _process_sector_chunk_mp(args): + """Multiprocessing worker: (de/en)crypt a contiguous run of sectors. + + Picklable, module-level function for use with ProcessPoolExecutor. Takes a + single ``bytes`` blob of one or more whole sectors and returns the processed + blob, so only one (large) object is pickled per task instead of many small + ones — this is what makes the multiprocessing worthwhile. + """ + disc_key, sector_blob, sector_start_number, encrypt_mode = args + out = bytearray() + for offset in range(0, len(sector_blob), core.SECTOR): + sector = sector_blob[offset:offset + core.SECTOR] + iv = _make_iv(sector_start_number + offset // core.SECTOR) + cipher = AES.new(disc_key, AES.MODE_CBC, iv) + out += cipher.encrypt(sector) if encrypt_mode else cipher.decrypt(sector) + return bytes(out) class ISO: @@ -184,189 +188,134 @@ class ISO: if not args.output: args.output = f'{game_title} [{param["TITLE_ID"]}].iso' - except Exception: + except (UnicodeDecodeError, KeyError, IndexError, ValueError): core.warning('Failed reading SFO', args) self.disc_key = self.get_key_from_args(game_title, args) if args.verbose and not args.quiet: self.print_info() - def _make_iv(self, sector_number): - """Build a 16-byte IV from a sector number (little-endian).""" - iv = bytearray(16) - num = sector_number - for j in range(16): - iv[15 - j] = num & 0xFF - num >>= 8 - return bytes(iv) - - def _process_region_pipeline(self, input_path, region, num_workers, encrypt_mode, args): - """Process an encrypted region using a reader-worker pipeline. - - A reader thread reads sectors from the file and puts them on a queue. - Worker threads pull from the queue, process each sector in parallel, - and store results. This overlaps I/O with processing for better CPU usage. - """ - num_sectors = (region['end'] - region['start']) // core.SECTOR - queue_size = min(64, num_sectors) - sector_queue = queue.Queue(maxsize=queue_size) - results = [None] * num_sectors - results_lock = threading.Lock() - - def reader(): - with open(input_path, 'rb') as f: - f.seek(region['start']) - for i in range(num_sectors): - sector_data = f.read(core.SECTOR) - sector_queue.put((i, sector_data)) - for _ in range(num_workers): - sector_queue.put(None) - - def worker(): - while True: - item = sector_queue.get() - if item is None: - sector_queue.task_done() - break - idx, sector_data = item - _, processed = (_encrypt_sector_worker if encrypt_mode else _decrypt_sector_worker)( - self.disc_key, sector_data, region['start'] // core.SECTOR + idx - ) - with results_lock: - results[idx] = processed - sector_queue.task_done() - - reader_thread = threading.Thread(target=reader, daemon=True) - reader_thread.start() - - workers = [] - for _ in range(num_workers): - t = threading.Thread(target=worker, daemon=True) - t.start() - workers.append(t) - - for t in workers: - t.join() - - reader_thread.join() - - return b''.join(results) - def decrypt(self, args): """Decrypt self using args from argparse.""" - - core.vprint(f'Decrypting with disc key: {self.disc_key.hex()}', args) - - num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 - if num_workers > 1: - core.vprint(f'Using {num_workers} threads for parallel decryption', args) - - if not args.output: - output_name = f'{self.game_id}.iso' - else: - output_name = args.output - - core.vprint(f'Decrypted .iso is output to: {output_name}', args) - - total_sectors = self.size // core.SECTOR - - with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: - - pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True) - - for region in self.regions: - region_sectors = (region['end'] - region['start']) // core.SECTOR - - if not region['enc']: - # Unencrypted region — copy sequentially - input_iso.seek(region['start']) - for _ in range(region_sectors): - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break - output_iso.write(data) - pbar.update(1) - else: - # Encrypted region — pipeline: reader thread + worker threads - if num_workers > 1: - processed = self._process_region_pipeline( - args.iso, region, num_workers, encrypt_mode=False, args=args - ) - else: - # Sequential fallback - input_iso.seek(region['start']) - processed = bytearray() - for i in range(region_sectors): - sector_num = region['start'] // core.SECTOR + i - iv = self._make_iv(sector_num) - cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) - processed.extend(cipher.decrypt(input_iso.read(core.SECTOR))) - processed = bytes(processed) - - output_iso.write(processed) - pbar.update(region_sectors) - - pbar.close() - core.vprint('Decryption complete!', args) + self._process_iso(args, encrypt_mode=False) def encrypt(self, args): """Encrypt self using args from argparse.""" + self._process_iso(args, encrypt_mode=True) - core.vprint(f'Re-encrypting with disc key: {self.disc_key.hex()}', args) + def _process_iso(self, args, encrypt_mode): + """Shared driver for decrypt/encrypt. ``encrypt_mode`` selects direction.""" - num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 - if num_workers > 1: - core.vprint(f'Using {num_workers} threads for parallel re-encryption', args) + core.vprint(f'{"Re-encrypting" if encrypt_mode else "Decrypting"} with disc key: {self.disc_key.hex()}', args) - if not args.output: - output_name = f'{self.game_id}_e.iso' - else: + num_workers = args.threads if args.threads and args.threads > 0 else (os.cpu_count() or 1) + + if args.output: output_name = args.output + else: + output_name = f'{self.game_id}_e.iso' if encrypt_mode else f'{self.game_id}.iso' - core.vprint(f'Re-encrypted .iso is output to: {output_name}', args) + core.vprint(f'{"Re-encrypted" if encrypt_mode else "Decrypted"} .iso is output to: {output_name}', args) - with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: + total_sectors = self.size // core.SECTOR + pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True) - pbar = tqdm(total=(self.size // 2048), file=sys.stdout, disable=args.quiet, leave=True) + if num_workers > 1: + core.vprint(f'Using {num_workers} processes for parallel {"re-encryption" if encrypt_mode else "decryption"}', args) + self._process_parallel(args, output_name, encrypt_mode, num_workers, pbar) + else: + self._process_sequential(args, output_name, encrypt_mode, pbar) + pbar.close() + core.vprint(f'{"Re-encryption" if encrypt_mode else "Decryption"} complete!', args) + + def _process_sequential(self, args, output_name, encrypt_mode, pbar): + """Single-process path: read, optionally (de/en)crypt, and write per sector.""" + with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_file: for region in self.regions: region_sectors = (region['end'] - region['start']) // core.SECTOR + base_sector = region['start'] // core.SECTOR + input_iso.seek(region['start']) - if not region['enc']: - # Unencrypted region — copy sequentially + for i in range(region_sectors): + data = input_iso.read(core.SECTOR) + if not data: + core.warning('Trying to read past the end of the file', args) + break + if region['enc']: + cipher = AES.new(self.disc_key, AES.MODE_CBC, _make_iv(base_sector + i)) + data = cipher.encrypt(data) if encrypt_mode else cipher.decrypt(data) + output_file.write(data) + pbar.update(1) + + def _process_parallel(self, args, output_name, encrypt_mode, num_workers, pbar): + """Multi-process path: one process pool for the whole operation. + + The output file is pre-sized so every region can be written to its + absolute offset; this also clears any stale file from a previous run. + """ + # Pre-create the output at its final size and write to absolute offsets. + total_bytes = self.regions[-1]['end'] + with open(output_name, 'wb') as f: + f.truncate(total_bytes) + + with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor, \ + open(args.iso, 'rb') as input_iso, \ + open(output_name, 'r+b') as out: + for region in self.regions: + if region['enc']: + self._process_region_parallel(input_iso, out, region, executor, num_workers, encrypt_mode, pbar) + else: + # Unencrypted region — copy straight through to its offset. + region_sectors = (region['end'] - region['start']) // core.SECTOR input_iso.seek(region['start']) + out.seek(region['start']) for _ in range(region_sectors): data = input_iso.read(core.SECTOR) if not data: core.warning('Trying to read past the end of the file', args) break - output_iso.write(data) + out.write(data) pbar.update(1) - else: - # Encrypted region — pipeline: reader thread + worker threads - if num_workers > 1: - processed = self._process_region_pipeline( - args.iso, region, num_workers, encrypt_mode=True, args=args - ) - else: - # Sequential fallback - input_iso.seek(region['start']) - processed = bytearray() - for i in range(region_sectors): - sector_num = region['start'] // core.SECTOR + i - iv = self._make_iv(sector_num) - cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) - processed.extend(cipher.encrypt(input_iso.read(core.SECTOR))) - processed = bytes(processed) - output_iso.write(processed) - pbar.update(region_sectors) + def _process_region_parallel(self, input_iso, out, region, executor, num_workers, encrypt_mode, pbar): + """(De/en)crypt one encrypted region across worker processes. - if not args.quiet: - pbar.close() + Reads the region in bounded-size chunks and keeps only a small window of + chunks in flight, so peak memory stays bounded even for huge ISOs. + Results are written back to their absolute offsets as they complete. + """ + num_sectors = (region['end'] - region['start']) // core.SECTOR + base_sector = region['start'] // core.SECTOR - core.vprint('Re-encryption complete!', args) + # Sectors per task: large enough to amortise IPC, small enough to keep + # every worker busy and bound peak memory. + sectors_per_chunk = max(1, min(512, (num_sectors // (num_workers * 4)) or 1)) + max_in_flight = num_workers * 4 + + input_iso.seek(region['start']) + next_sector = base_sector + sectors_left = num_sectors + pending = deque() + + def submit_more(): + nonlocal next_sector, sectors_left + while sectors_left > 0 and len(pending) < max_in_flight: + count = min(sectors_per_chunk, sectors_left) + blob = input_iso.read(count * core.SECTOR) + future = executor.submit(_process_sector_chunk_mp, + (self.disc_key, blob, next_sector, encrypt_mode)) + pending.append((next_sector, count, future)) + next_sector += count + sectors_left -= count + + submit_more() + while pending: + start_sector, count, future = pending.popleft() + out.seek(start_sector * core.SECTOR) + out.write(future.result()) + pbar.update(count) + submit_more() def get_key_from_args(self, game_title, args): # key provided with -d / --decryption-key @@ -392,24 +341,17 @@ class ISO: # No key or .ird specified. Let's first check if keys.db is packaged with this release core.vprint('Checking for bundled redump keys', args) - try: - db_path = resources.files(__name__).joinpath('data', 'keys.db') - if hasattr(db_path, 'read_bytes'): - # importlib.resources.abc.Traversable - write to temp file for sqlite3 - import tempfile - with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: - tmp.write(db_path.read_bytes()) - db = sqlite3.connect(tmp.name) - else: - db = sqlite3.connect(str(db_path)) - except (FileNotFoundError, AttributeError): - db = sqlite3.connect((pathlib.Path(__file__).resolve() / 'data/') / 'keys.db') + db_path = resources.files(__name__).joinpath('data', 'keys.db') + db = sqlite3.connect(str(db_path)) c = db.cursor() # UPDATE: 2024 - New database now has game/title ids. See if we have that. core.vprint('Searching using TITLE_ID', args) - keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-','')]).fetchall() + try: + keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-', '')]).fetchall() + except sqlite3.OperationalError: + core.error('keys.db not found or invalid. Build it with: python3 tools/keys2db.py') if len(keys) == 1: core.vprint(f'Found potential redump key: "{keys[0][0]}"', args) return keys[0][1] @@ -424,10 +366,15 @@ class ISO: # If not, see if we can filter it out based on name and size core.vprint('Trying to find redump key based on size, game title, and country', args) if not game_title: - raise ValueError + core.error('Could not determine game title from PARAM.SFO. Specify a decryption key with -d or provide an IRD file with -k.') + + try: + country = core.serial_country(self.game_id) + except ValueError: + core.error(f'Unknown country code in game ID "{self.game_id}". Specify a decryption key with -d or provide an IRD file with -k.') keys = c.execute('SELECT name, key FROM games WHERE lower(name) LIKE ? AND size = ?', [ - '%' + '%'.join(game_title.lower().split(' ')) + '%' + core.serial_country(self.game_id).lower() + '%', str(self.size)]).fetchall() + '%' + '%'.join(game_title.lower().split(' ')) + '%' + country.lower() + '%', str(self.size)]).fetchall() if keys: core.vprint(f'Found potential redump key: "{keys[0][0]}"', args) return keys[0][1] @@ -439,35 +386,29 @@ class ISO: # Okay, searching has failed us, but maaaybe the checksum works? core.vprint('Trying to find redump key based on CRC32', args) - crc32 = None - crc32_continue = [True] + cancel = threading.Event() + crc_done = threading.Event() if args.checksum_timeout > 0: - def timeout(allow_execution): - time.sleep(float(args.checksum_timeout)) - if crc32 is None: + def timeout(): + # Abort the CRC32 calculation if it hasn't finished in time. + if not crc_done.wait(timeout=float(args.checksum_timeout)): core.vprint(f'could not calculate CRC32 before {args.checksum_timeout}-second timeout', args) - allow_execution[0] = False - crc_thread = Thread(target=timeout, args=(crc32_continue,), daemon=True) + cancel.set() + + crc_thread = Thread(target=timeout, daemon=True) crc_thread.start() - crc32 = core.crc32(args.iso, crc32_continue) - if crc32 is None: + calculated_crc = core.crc32(args.iso, cancel) + crc_done.set() + if calculated_crc is None: raise TimeoutError - keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [crc32.lower()]).fetchall() + keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [calculated_crc.lower()]).fetchall() if len(keys) == 1: - core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={crc32.lower()})', args) + core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={calculated_crc.lower()})', args) return keys[0][1] - # Fallback to downloading an IRD from the internet (currently disabled) - # try: - # core.warning('No IRD file specified, finding required file', args) - # args.ird = core.ird_by_game_id(self.game_id) # Download ird - # return get_key_from_ird(args.ird) - # except: - # core.vprint('Could not download IRD file', args) - - raise ValueError + core.error('could not find disc key') def print_info(self): # TODO: This could probably have been a __str__? Who cares? diff --git a/libray/libray b/libray/libray deleted file mode 100755 index 08cc1a3..0000000 --- a/libray/libray +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf8 -*- - -# libray - Libre Blu-Ray PS3 ISO Tool -# Copyright © 2018 - 2024 Nichlas Severinsen -# -# This file is part of libray. -# -# libray is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# libray is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with libray. If not, see . - - -#!/usr/bin/env python3 -# -*- coding: utf8 -*- - -# libray - Libre Blu-Ray PS3 ISO Tool -# Copyright © 2018 - 2024 Nichlas Severinsen -# -# This file is part of libray. -# -# libray is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# libray is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with libray. If not, see . - -import argparse -import sys - - -try: - from libray import core -except ImportError: - import core - - -def main(): - parser = argparse.ArgumentParser( - description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs') - - required = parser.add_mutually_exclusive_group(required=True) - required.add_argument('-i', '--iso', dest='iso', type=str, help='Path to .iso file or stream') - required.add_argument('-k', '--ird', dest='ird', type=str, help='Path to .ird file', default='') - - optional = parser.add_argument_group('optional arguments') - optional.add_argument('-o', '--output', dest='output', type=str, help='Output filename', default='') - optional.add_argument('-d', '--decryption-key', dest='decryption_key', type=str, help='Manually specify key', default='') - optional.add_argument('-v', '--verbose', dest='verbose', help='Increase verbosity', action='count') - optional.add_argument('-q', '--quiet', dest='quiet', help='Quiet mode, only prints on error', action='store_true') - # -e is reserved for "extract" so re-encrypt is "-r" - optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true') - optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true') - optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15) - optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0) - optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.') - - args = parser.parse_args() - - if args.info: - core.info(args) - - if not args.iso: - core.error('No .iso file given. Use -i/--iso path/to/file.iso') - - if args.reencrypt: - core.encrypt(args) - else: - core.decrypt(args) - - -if __name__ == '__main__': - main() diff --git a/libray/libray b/libray/libray new file mode 120000 index 0000000..211fc97 --- /dev/null +++ b/libray/libray @@ -0,0 +1 @@ +libray.py \ No newline at end of file diff --git a/libray/libray.py b/libray/libray.py index 08cc1a3..fe061e6 100755 --- a/libray/libray.py +++ b/libray/libray.py @@ -19,27 +19,12 @@ # You should have received a copy of the GNU General Public License # along with libray. If not, see . +import multiprocessing -#!/usr/bin/env python3 -# -*- coding: utf8 -*- - -# libray - Libre Blu-Ray PS3 ISO Tool -# Copyright © 2018 - 2024 Nichlas Severinsen -# -# This file is part of libray. -# -# libray is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# libray is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with libray. If not, see . +try: + multiprocessing.set_start_method('spawn') +except RuntimeError: + pass import argparse import sys diff --git a/target.md5 b/target.md5 new file mode 100644 index 0000000..b9aabaf --- /dev/null +++ b/target.md5 @@ -0,0 +1 @@ +c16539e7abd6fe851ab2a67c0cbbaa32 diff --git a/tests/test_iso.py b/tests/test_iso.py index 25de326..63ad0a0 100644 --- a/tests/test_iso.py +++ b/tests/test_iso.py @@ -24,6 +24,7 @@ import argparse +import os import unittest import unittest.mock as mock from Crypto.Cipher import AES @@ -162,7 +163,7 @@ class TestISO(unittest.TestCase): decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]] - with self.assertRaises(ValueError): + with self.assertRaises(SystemExit): fake_iso.get_key_from_args(None, mock_args) @mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace()) @@ -223,4 +224,171 @@ class TestISO(unittest.TestCase): with self.assertRaises(TimeoutError): fake_iso.get_key_from_args('AAA', mock_args) + def _build_fake_iso(self, num_sectors=8, disc_key=None): + """Build a minimal fake ISO binary in a temp file. + + Layout: + [0:4] num_unencrypted_regions (uint32 BE = 1) + [4:8] padding (4 bytes, skipped by constructor) + [8:12] unencrypted region start sector (uint32 BE = 0) + [12:16] unencrypted region end sector (uint32 BE = 2) + [16:20] encrypted region end sector (uint32 BE = 6) + [20:24] terminator (0) + [2048:2064] game_id (16 bytes) + [2064:2072] PARAM.SFO marker (b'\\x00PSF') + Sectors 0-1 = unencrypted header + Sectors 2-5 = encrypted payload (zeros) + Sector 6+ = unencrypted tail + """ + import tempfile + if disc_key is None: + disc_key = b'\xaa' * 16 + + # Region info: constructor reads bytes 0-3, skips 4-7, then + # read_regions() reads bytes 8-11 (start) and 12-15 (end) + region_info = b'\x00\x00\x00\x01' # num_unencrypted_regions (offset 0) + region_info += b'\x00\x00\x00\x00' # padding (offset 4, skipped) + region_info += b'\x00\x00\x00\x00' # unencrypted start = sector 0 (offset 8) + region_info += b'\x00\x00\x00\x02' # unencrypted end = sector 2 (offset 12) + region_info += b'\x00\x00\x00\x06' # encrypted end = sector 6 (offset 16) + region_info += b'\x00\x00\x00\x00' # terminator (offset 20) + + # Sector 1 (offset 2048): unencrypted header with game_id and PARAM.SFO marker + sector1 = bytearray(2048) + # game_id at offset 2048+16=2064 + sector1[16:32] = b'TEST-GAME0000000' + # PARAM.SFO marker at offset 2048+20=2068 + sector1[20:24] = b'\x00PSF' + + # Encrypt sectors 2-5 with AES-CBC per-sector IV + from libray.iso import _make_iv + encrypted_payload = bytearray() + for s in range(2, 6): + iv = _make_iv(s) + cipher = AES.new(disc_key, AES.MODE_CBC, iv) + encrypted_payload.extend(cipher.encrypt(b'\x00' * 2048)) + + # Pad so sector data starts at offset 2048 (PS3 ISO convention) + data = region_info + b'\x00' * (2048 - len(region_info)) + sector1 + encrypted_payload + + # Ensure file is at least as large as the last region end (sector 6 = 12288 bytes) + # The constructor uses size to calculate total_sectors, so file must be >= region end + while len(data) < 12288: + data += b'\x00' * 2048 + + total = len(data) + + fd, path = tempfile.mkstemp(suffix='.iso') + with os.fdopen(fd, 'wb') as f: + f.write(data) + return path, total + + @mock.patch('argparse.ArgumentParser.parse_args') + def test_decrypt_parallel_pipeline(self, mock_args): + """Verify the multi-threaded decrypt pipeline processes all sectors.""" + disc_key = b'\xbb' * 16 + path, size = self._build_fake_iso(disc_key=disc_key) + try: + mock_args.iso = path + mock_args.decryption_key = disc_key.hex() + mock_args.ird = '' + mock_args.output = '' + mock_args.verbose = True + mock_args.quiet = True + mock_args.checksum = False + mock_args.checksum_timeout = 0 + mock_args.threads = 4 + + out_path = path + '.dec' + mock_args.output = out_path + + s = iso.ISO(mock_args) + s.decrypt(mock_args) + + self.assertTrue(os.path.exists(out_path)) + with open(out_path, 'rb') as f: + decrypted = f.read() + # Sectors 2-5 should be decrypted back to zeros + self.assertEqual(len(decrypted), size) + # Encrypted region: start=6144 (region[0].end), end=12288 + self.assertEqual(decrypted[6144:12288], b'\x00' * 6144) + finally: + os.unlink(path) + if os.path.exists(path + '.dec'): + os.unlink(path + '.dec') + + @mock.patch('argparse.ArgumentParser.parse_args') + def test_decrypt_sequential_fallback(self, mock_args): + """Verify single-thread decrypt still works (sequential path).""" + disc_key = b'\xcc' * 16 + path, size = self._build_fake_iso(disc_key=disc_key) + try: + mock_args.iso = path + mock_args.decryption_key = disc_key.hex() + mock_args.ird = '' + mock_args.output = '' + mock_args.verbose = True + mock_args.quiet = True + mock_args.checksum = False + mock_args.checksum_timeout = 0 + mock_args.threads = 1 + + out_path = path + '.dec' + mock_args.output = out_path + + s = iso.ISO(mock_args) + s.decrypt(mock_args) + + self.assertTrue(os.path.exists(out_path)) + with open(out_path, 'rb') as f: + decrypted = f.read() + self.assertEqual(decrypted[6144:12288], b'\x00' * 6144) + finally: + os.unlink(path) + if os.path.exists(path + '.dec'): + os.unlink(path + '.dec') + + @mock.patch('argparse.ArgumentParser.parse_args') + def test_encrypt_parallel_pipeline(self, mock_args): + """Verify the multi-threaded re-encrypt pipeline works.""" + disc_key = b'\xdd' * 16 + path, size = self._build_fake_iso(disc_key=disc_key) + try: + # First decrypt to get a plain ISO + mock_args.iso = path + mock_args.decryption_key = disc_key.hex() + mock_args.ird = '' + mock_args.output = path + '.dec' + mock_args.verbose = True + mock_args.quiet = True + mock_args.checksum = False + mock_args.checksum_timeout = 0 + mock_args.threads = 4 + s = iso.ISO(mock_args) + s.decrypt(mock_args) + + # Now re-encrypt with -r + reenc_path = path + '.reenc' + mock_args.iso = path + '.dec' + mock_args.decryption_key = disc_key.hex() + mock_args.reencrypt = True + mock_args.output = reenc_path + mock_args.threads = 4 + + s2 = iso.ISO(mock_args) + s2.encrypt(mock_args) + + self.assertTrue(os.path.exists(reenc_path)) + with open(reenc_path, 'rb') as f: + reencrypted = f.read() + with open(path, 'rb') as f: + original = f.read() + self.assertEqual(reencrypted, original) + finally: + os.unlink(path) + if os.path.exists(path + '.dec'): + os.unlink(path + '.dec') + if os.path.exists(path + '.reenc'): + os.unlink(path + '.reenc') + -- 2.43.0