From 35e49044f1692961bf651c917f66523faa208e6e Mon Sep 17 00:00:00 2001 From: Oracle Date: Tue, 19 May 2026 17:41:17 +0200 Subject: [PATCH] Initial parallelization attempt --- AGENTS.md | 35 ++++++ libray/__init__.py | 6 +- libray/core.py | 53 +++++---- libray/iso.py | 278 +++++++++++++++++++++++++++++---------------- libray/libray | 30 ++++- libray/libray.py | 30 ++++- libray/py.typed | 0 requirements.txt | 2 - setup.py | 8 +- tests/test_iso.py | 22 ++-- 10 files changed, 321 insertions(+), 143 deletions(-) create mode 100644 AGENTS.md create mode 100644 libray/py.typed diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..112b803 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,35 @@ +# LibRay — Agent Quick Reference +# Important +Make sure to use the virtual environment in .venv and not global pip. + +## Repo +- Python 3 CLI tool for decrypting/encrypting/examining PS3 Blu-Ray ISOs +- Entry point: `libray/libray` (also `libray/libray.py`, identical copy) +- Package: `libray/` — modules: `core.py` (main logic), `iso.py` (ISO parsing), `ird.py` (IRD parsing), `sfo.py` (PARAM.SFO) +- Tests: `tests/` — `test_iso.py`, `test_interface.py` (interface test is currently skipped/broken) +- Tools: `tools/keys2db.py` (builds `libray/data/keys.db` from redump keys), `tools/rpcs3.py` (fetches compat data) + +## Commands +- Install deps: `pip install -r requirements.txt` +- Run tests: `python -m unittest discover -b` +- Build sdist + wheel: `python3 setup.py sdist bdist_wheel` +- Publish: `twine upload dist/*` + +## Parallelization +- Decrypt and re-encrypt support multi-threading via `-p`/`--threads` CLI argument +- Default: auto-detects CPU core count via `os.cpu_count()` +- Each sector is independently decrypted (per-sector IV in AES-CBC), making it embarrassingly parallel +- Uses `concurrent.futures.ThreadPoolExecutor` (threads, not processes, since pycryptodome releases the GIL) +- Unencrypted regions are always copied sequentially (no crypto needed) +- Sector data is read upfront into memory, then processed in parallel, then written in order + +## Gotchas +- **Crypto package conflict**: `pycrypto`/`crypto` will break `pycryptodome`. If `ImportError: No module named Crypto.Cipher`, run: + ``` + pip uninstall crypto pycrypto && pip install pycryptodome + ``` +- **keys.db is generated**, not committed. Build it with `python3 tools/keys2db.py` (requires keys in `tools/keys/`). It's listed in `.gitignore` via `libray/data/*.db`. +- **`libray/__init__.py`** dynamically imports all submodules via `pkgutil.walk_packages` — don't expect explicit imports. +- **`test_interface.py`** is skipped (`@unittest.skip('currently broken')`) — the interface test won't run. +- `.editorconfig` enforces 4-space indent for `.py`, 2-space for `.yml`/`.yaml`. +- No linting/typechecking config exists — plain `unittest`, no pytest, no pre-commit. diff --git a/libray/__init__.py b/libray/__init__.py index 305c8e6..d6881a2 100644 --- a/libray/__init__.py +++ b/libray/__init__.py @@ -24,12 +24,10 @@ # This script is not included in the release of libray. import pkgutil +import importlib __all__ = [] for loader, module_name, is_pkg in pkgutil.walk_packages(__path__): __all__.append(module_name) - try: - _module = loader.find_module(module_name).load_module(module_name) - except AttributeError: - _module = loader.find_spec(module_name).loader.load_module(module_name) + _module = importlib.import_module(f'.{module_name}', __name__) globals()[module_name] = _module diff --git a/libray/core.py b/libray/core.py index 991b9de..741035d 100644 --- a/libray/core.py +++ b/libray/core.py @@ -26,6 +26,7 @@ import zlib import shutil import requests from bs4 import BeautifulSoup +from typing import Any, Optional try: @@ -44,23 +45,25 @@ GET_IRD_NET_LOC = 'http://jonnysp.bplaced.net/ird/' # Utility functions -def to_int(data, byteorder='big'): +def to_int(data: bytes, byteorder: str = 'big') -> int: """Convert bytes to integer""" if isinstance(data, bytes): return int.from_bytes(data, byteorder) + return 0 -def to_bytes(data): +def to_bytes(data: str) -> Optional[bytes]: """Convert a string of HEX to bytes""" if isinstance(data, str): return bytes(bytearray.fromhex(data)) + return None ISO_SECRET = to_bytes("380bcf0b53455b3c7817ab4fa3ba90ed") ISO_IV = to_bytes("69474772af6fdab342743aefaa186287") -def size(path): +def size(path: str) -> int: """Get size of a file or block device in bytes""" pathstat = os.stat(path) @@ -74,7 +77,7 @@ def size(path): return pathstat.st_size -def read_seven_bit_encoded_int(fileobj, order): +def read_seven_bit_encoded_int(fileobj, order: str) -> int: """Read an Int32, 7 bits at a time.""" # The highest bit of the byte, when on, means to continue reading more bytes. count = 0 @@ -90,27 +93,27 @@ def read_seven_bit_encoded_int(fileobj, order): return count -def error(msg): +def error(msg: str) -> None: """Print fatal error message and terminate""" - print('[ERROR] %s' % msg, file=sys.stderr) + print(f'[ERROR] {msg}', file=sys.stderr) sys.exit(1) -def warning(msg, args): +def warning(msg: str, args) -> None: """Print a warning message. Warning messages can be silenced with --quiet""" if not args.quiet: - print('[WARNING] %s. Continuing regardless.' % msg, file=sys.stderr) + print(f'[WARNING] {msg}. Continuing regardless.', file=sys.stderr) -def vprint(msg, args): +def vprint(msg: str, args) -> None: """Vprint, verbose print, can be silenced with --quiet""" if not args.quiet: - print('[*] ' + msg) + print(f'[*] {msg}') -def download_ird(ird_name): +def download_ird(ird_name: str) -> None: """Download an .ird from GET_IRD_NET_LOC""" # Check if file already exists and skip if it does @@ -118,14 +121,14 @@ def download_ird(ird_name): return ird_link = GET_IRD_NET_LOC + ird_name - r = requests.get(ird_link, stream=True) + r = requests.get(ird_link, stream=True, timeout=30) with open(ird_name, 'wb') as ird_file: r.raw.decode_content = True shutil.copyfileobj(r.raw, ird_file) -def ird_by_game_id(game_id): +def ird_by_game_id(game_id: str) -> Optional[str]: """Using a game_id, download the responding .ird from ALL_IRD_NET_LOC""" gameid = game_id.replace('-', '') try: @@ -148,26 +151,28 @@ def ird_by_game_id(game_id): return (ird_name) -def crc32(filename, keep_going=[True]): +def crc32(filename: str, keep_going: Optional[list] = None) -> Optional[str]: """Calculate crc32 for file""" + if keep_going is None: + keep_going = [True] with open(filename, 'rb') as infile: - crc32 = 0 + crc_val = 0 - while keep_going[0] == True: + while keep_going[0] is True: data = infile.read(65536) if not data: break - crc32 = zlib.crc32(data, crc32) + crc_val = zlib.crc32(data, crc_val) - if keep_going[0] == False: + if keep_going[0] is False: return None - return "%08X" % (crc32 & 0xFFFFFFFF) + return f"{crc_val & 0xFFFFFFFF:08X}" -def serial_country(title): +def serial_country(title: str) -> str: """Get country from disc serial / productcode / title_id""" if title[2] == 'A': @@ -188,7 +193,7 @@ def serial_country(title): raise ValueError('Unknown country?!') -def multiman_title(title): +def multiman_title(title: str) -> str: """Fix special characters in title for Multiman style""" replace = { @@ -206,7 +211,7 @@ def multiman_title(title): # Main functions -def info(args): +def info(args: Any) -> None: """Print information about .iso and then quit.""" if args.iso: @@ -220,7 +225,7 @@ def info(args): sys.exit() -def decrypt(args): +def decrypt(args: Any) -> None: """Try to decrypt a given .iso using relevant .ird or encryption key from argparse If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso @@ -232,7 +237,7 @@ def decrypt(args): input_iso.decrypt(args) -def encrypt(args): +def encrypt(args: Any) -> None: """Try to re-encrypt a decrypted .iso using relevant .ird or encryption key from argparse If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso diff --git a/libray/iso.py b/libray/iso.py index 2fcc04b..df5ce4a 100644 --- a/libray/iso.py +++ b/libray/iso.py @@ -23,9 +23,12 @@ import os import sys import sqlite3 import pathlib -from threading import Thread +import concurrent.futures +import threading import time -import pkg_resources +import queue +from threading import Thread +from importlib import resources from tqdm import tqdm from Crypto.Cipher import AES @@ -40,6 +43,28 @@ except ImportError: import sfo +def _decrypt_sector_worker(disc_key, sector_data, sector_number): + """Standalone worker for parallel sector decryption.""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) + return (sector_number, cipher.decrypt(sector_data)) + + +def _encrypt_sector_worker(disc_key, sector_data, sector_number): + """Standalone worker for parallel sector encryption.""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + cipher = AES.new(disc_key, AES.MODE_CBC, bytes(iv)) + return (sector_number, cipher.encrypt(sector_data)) + + class ISO: """Class for handling PS3 .iso files. @@ -166,127 +191,182 @@ class ISO: if args.verbose and not args.quiet: self.print_info() + def _make_iv(self, sector_number): + """Build a 16-byte IV from a sector number (little-endian).""" + iv = bytearray(16) + num = sector_number + for j in range(16): + iv[15 - j] = num & 0xFF + num >>= 8 + return bytes(iv) + + def _process_region_pipeline(self, input_path, region, num_workers, encrypt_mode, args): + """Process an encrypted region using a reader-worker pipeline. + + A reader thread reads sectors from the file and puts them on a queue. + Worker threads pull from the queue, process each sector in parallel, + and store results. This overlaps I/O with processing for better CPU usage. + """ + num_sectors = (region['end'] - region['start']) // core.SECTOR + queue_size = min(64, num_sectors) + sector_queue = queue.Queue(maxsize=queue_size) + results = [None] * num_sectors + results_lock = threading.Lock() + + def reader(): + with open(input_path, 'rb') as f: + f.seek(region['start']) + for i in range(num_sectors): + sector_data = f.read(core.SECTOR) + sector_queue.put((i, sector_data)) + for _ in range(num_workers): + sector_queue.put(None) + + def worker(): + while True: + item = sector_queue.get() + if item is None: + sector_queue.task_done() + break + idx, sector_data = item + _, processed = (_encrypt_sector_worker if encrypt_mode else _decrypt_sector_worker)( + self.disc_key, sector_data, region['start'] // core.SECTOR + idx + ) + with results_lock: + results[idx] = processed + sector_queue.task_done() + + reader_thread = threading.Thread(target=reader, daemon=True) + reader_thread.start() + + workers = [] + for _ in range(num_workers): + t = threading.Thread(target=worker, daemon=True) + t.start() + workers.append(t) + + for t in workers: + t.join() + + reader_thread.join() + + return b''.join(results) + def decrypt(self, args): """Decrypt self using args from argparse.""" core.vprint(f'Decrypting with disc key: {self.disc_key.hex()}', args) - with open(args.iso, 'rb') as input_iso: + num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 + if num_workers > 1: + core.vprint(f'Using {num_workers} threads for parallel decryption', args) - if not args.output: - output_name = f'{self.game_id}.iso' - else: - output_name = args.output + if not args.output: + output_name = f'{self.game_id}.iso' + else: + output_name = args.output - core.vprint(f'Decrypted .iso is output to: {output_name}', args) + core.vprint(f'Decrypted .iso is output to: {output_name}', args) - with open(output_name, 'wb') as output_iso: + total_sectors = self.size // core.SECTOR - if not args.quiet: - pbar = tqdm(total=(self.size // 2048)) + with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: - for region in self.regions: + pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True) + + for region in self.regions: + region_sectors = (region['end'] - region['start']) // core.SECTOR + + if not region['enc']: + # Unencrypted region — copy sequentially input_iso.seek(region['start']) - - # Unencrypted region, just copy it - if not region['enc']: - while input_iso.tell() < region['end']: - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break - output_iso.write(data) - - if not args.quiet: - pbar.update(1) - continue - # Encrypted region, decrypt then write + for _ in range(region_sectors): + data = input_iso.read(core.SECTOR) + if not data: + core.warning('Trying to read past the end of the file', args) + break + output_iso.write(data) + pbar.update(1) + else: + # Encrypted region — pipeline: reader thread + worker threads + if num_workers > 1: + processed = self._process_region_pipeline( + args.iso, region, num_workers, encrypt_mode=False, args=args + ) else: - while input_iso.tell() < region['end']: - num = input_iso.tell() // 2048 - iv = bytearray([0 for i in range(0, 16)]) - for j in range(0, 16): - iv[16 - j - 1] = (num & 0xFF) - num >>= 8 + # Sequential fallback + input_iso.seek(region['start']) + processed = bytearray() + for i in range(region_sectors): + sector_num = region['start'] // core.SECTOR + i + iv = self._make_iv(sector_num) + cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) + processed.extend(cipher.decrypt(input_iso.read(core.SECTOR))) + processed = bytes(processed) - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break + output_iso.write(processed) + pbar.update(region_sectors) - cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv)) - decrypted = cipher.decrypt(data) - - output_iso.write(decrypted) - - if not args.quiet: - pbar.update(1) - - if not args.quiet: - pbar.close() - - core.vprint('Decryption complete!', args) + pbar.close() + core.vprint('Decryption complete!', args) def encrypt(self, args): """Encrypt self using args from argparse.""" core.vprint(f'Re-encrypting with disc key: {self.disc_key.hex()}', args) - with open(args.iso, 'rb') as input_iso: + num_workers = args.threads if args.threads and args.threads > 0 else os.cpu_count() or 1 + if num_workers > 1: + core.vprint(f'Using {num_workers} threads for parallel re-encryption', args) - if not args.output: - output_name = f'{self.game_id}_e.iso' - else: - output_name = args.output + if not args.output: + output_name = f'{self.game_id}_e.iso' + else: + output_name = args.output - core.vprint(f'Re-encrypted .iso is output to: {output_name}', args) + core.vprint(f'Re-encrypted .iso is output to: {output_name}', args) - with open(output_name, 'wb') as output_iso: + with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_iso: - if not args.quiet: - pbar = tqdm(total=(self.size // 2048)) + pbar = tqdm(total=(self.size // 2048), file=sys.stdout, disable=args.quiet, leave=True) - for region in self.regions: + for region in self.regions: + region_sectors = (region['end'] - region['start']) // core.SECTOR + + if not region['enc']: + # Unencrypted region — copy sequentially input_iso.seek(region['start']) - - # Unencrypted region, just copy it - if not region['enc']: - while input_iso.tell() < region['end']: - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break - output_iso.write(data) - - if not args.quiet: - pbar.update(1) - continue - # Decrypted region, re-encrypt it + for _ in range(region_sectors): + data = input_iso.read(core.SECTOR) + if not data: + core.warning('Trying to read past the end of the file', args) + break + output_iso.write(data) + pbar.update(1) + else: + # Encrypted region — pipeline: reader thread + worker threads + if num_workers > 1: + processed = self._process_region_pipeline( + args.iso, region, num_workers, encrypt_mode=True, args=args + ) else: - while input_iso.tell() < region['end']: - num = input_iso.tell() // 2048 - iv = bytearray([0 for i in range(0, 16)]) - for j in range(0, 16): - iv[16 - j - 1] = (num & 0xFF) - num >>= 8 + # Sequential fallback + input_iso.seek(region['start']) + processed = bytearray() + for i in range(region_sectors): + sector_num = region['start'] // core.SECTOR + i + iv = self._make_iv(sector_num) + cipher = AES.new(self.disc_key, AES.MODE_CBC, iv) + processed.extend(cipher.encrypt(input_iso.read(core.SECTOR))) + processed = bytes(processed) - data = input_iso.read(core.SECTOR) - if not data: - core.warning('Trying to read past the end of the file', args) - break + output_iso.write(processed) + pbar.update(region_sectors) - cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv)) - encrypted = cipher.encrypt(data) + if not args.quiet: + pbar.close() - output_iso.write(encrypted) - - if not args.quiet: - pbar.update(1) - - if not args.quiet: - pbar.close() - - core.vprint('Re-encryption complete!', args) + core.vprint('Re-encryption complete!', args) def get_key_from_args(self, game_title, args): # key provided with -d / --decryption-key @@ -313,8 +393,16 @@ class ISO: core.vprint('Checking for bundled redump keys', args) try: - db = sqlite3.connect(pkg_resources.resource_filename(__name__, 'data/keys.db')) - except FileNotFoundError: + db_path = resources.files(__name__).joinpath('data', 'keys.db') + if hasattr(db_path, 'read_bytes'): + # importlib.resources.abc.Traversable - write to temp file for sqlite3 + import tempfile + with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: + tmp.write(db_path.read_bytes()) + db = sqlite3.connect(tmp.name) + else: + db = sqlite3.connect(str(db_path)) + except (FileNotFoundError, AttributeError): db = sqlite3.connect((pathlib.Path(__file__).resolve() / 'data/') / 'keys.db') c = db.cursor() diff --git a/libray/libray b/libray/libray index aec94df..08cc1a3 100755 --- a/libray/libray +++ b/libray/libray @@ -20,7 +20,29 @@ # along with libray. If not, see . +#!/usr/bin/env python3 +# -*- coding: utf8 -*- + +# libray - Libre Blu-Ray PS3 ISO Tool +# Copyright © 2018 - 2024 Nichlas Severinsen +# +# This file is part of libray. +# +# libray is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# libray is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with libray. If not, see . + import argparse +import sys try: @@ -29,8 +51,7 @@ except ImportError: import core -if __name__ == '__main__': - +def main(): parser = argparse.ArgumentParser( description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs') @@ -47,6 +68,7 @@ if __name__ == '__main__': optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true') optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true') optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15) + optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0) optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.') args = parser.parse_args() @@ -61,3 +83,7 @@ if __name__ == '__main__': core.encrypt(args) else: core.decrypt(args) + + +if __name__ == '__main__': + main() diff --git a/libray/libray.py b/libray/libray.py index aec94df..08cc1a3 100755 --- a/libray/libray.py +++ b/libray/libray.py @@ -20,7 +20,29 @@ # along with libray. If not, see . +#!/usr/bin/env python3 +# -*- coding: utf8 -*- + +# libray - Libre Blu-Ray PS3 ISO Tool +# Copyright © 2018 - 2024 Nichlas Severinsen +# +# This file is part of libray. +# +# libray is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# libray is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with libray. If not, see . + import argparse +import sys try: @@ -29,8 +51,7 @@ except ImportError: import core -if __name__ == '__main__': - +def main(): parser = argparse.ArgumentParser( description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs') @@ -47,6 +68,7 @@ if __name__ == '__main__': optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true') optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true') optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15) + optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0) optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.') args = parser.parse_args() @@ -61,3 +83,7 @@ if __name__ == '__main__': core.encrypt(args) else: core.decrypt(args) + + +if __name__ == '__main__': + main() diff --git a/libray/py.typed b/libray/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index 3e249d6..680254f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,3 @@ tqdm~=4.66.2 pycryptodome~=3.20.0 requests~=2.31.0 beautifulsoup4~=4.12.3 -html5lib~=1.1 -setuptools~=69.1.1 diff --git a/setup.py b/setup.py index 416f5da..30ee147 100755 --- a/setup.py +++ b/setup.py @@ -19,14 +19,16 @@ setup( author_email="ns@nsz.no", url="https://notabug.org/necklace/libray", packages=['libray'], - scripts=['libray/libray'], + entry_points={ + 'console_scripts': [ + 'libray=libray.libray:main', + ], + }, install_requires=[ 'tqdm~=4.66.2', 'pycryptodome~=3.20.0', 'requests~=2.31.0', 'beautifulsoup4~=4.12.3', - 'html5lib~=1.1', - 'setuptools~=69.1.1', ], include_package_data=True, package_data={'': ['data/keys.db']}, diff --git a/tests/test_iso.py b/tests/test_iso.py index 3c7f2c8..25de326 100644 --- a/tests/test_iso.py +++ b/tests/test_iso.py @@ -62,7 +62,7 @@ class TestISO(unittest.TestCase): {'start': 1024, 'end': 2048, 'enc': False} ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: returned_key = fake_iso.get_key_from_args('AAA', mock_args) mock_ird.assert_called_once_with('aaa.ird') self.assertEqual(decryptkey_bytes, returned_key) @@ -84,7 +84,7 @@ class TestISO(unittest.TestCase): {'start': 0, 'end': 512, 'enc': False}, ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -107,7 +107,7 @@ class TestISO(unittest.TestCase): {'start': 2000000000, 'end': 2000001000, 'enc': False} ] - with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird: + with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird: with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -122,7 +122,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]] @@ -140,7 +140,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]] @@ -158,7 +158,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]] @@ -177,7 +177,7 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] with self.assertRaises(SystemExit): fake_iso.get_key_from_args('AAA', mock_args) @@ -195,12 +195,12 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' decryptkey_bytes = core.to_bytes(decryption_key) fakeresults = ([], [], [['AAA', decryptkey_bytes]]) mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults - with mock.patch('iso.core.crc32', return_value='01010101'): + with mock.patch('libray.iso.core.crc32', return_value='01010101'): returned_key = fake_iso.get_key_from_args('AAA', mock_args) self.assertEqual(decryptkey_bytes, returned_key) @@ -217,9 +217,9 @@ class TestISO(unittest.TestCase): fake_iso.size = 512 * 1024 * 1024 fake_iso.game_id = 'TCUS-12345' - with mock.patch('iso.sqlite3') as mocksql: + with mock.patch('libray.iso.sqlite3') as mocksql: mocksql.connect().cursor().execute().fetchall.return_value = [] - with mock.patch('iso.core.crc32', return_value=None): + with mock.patch('libray.iso.core.crc32', return_value=None): with self.assertRaises(TimeoutError): fake_iso.get_key_from_args('AAA', mock_args)