Merge parallelization into main #1

Merged
Oracle merged 2 commits from parallelization into master 2026-05-21 12:22:43 +02:00
12 changed files with 503 additions and 305 deletions

37
AGENTS.md Normal file
View file

@ -0,0 +1,37 @@
# LibRay — Agent Quick Reference
# Important
Make sure to use the virtual environment in `~/.venv/libray` (activate with `source ~/.venv/libray/bin/activate`) and not global pip.
## Repo
- Python 3 CLI tool for decrypting/encrypting/examining PS3 Blu-Ray ISOs
- Entry point: `libray/libray.py` (defines `main()`); installed console script is `libray=libray.libray:main` (see `setup.py`). `libray/libray` is a symlink to `libray.py` for running from a source checkout.
- Package: `libray/` — modules: `core.py` (main logic), `iso.py` (ISO parsing), `ird.py` (IRD parsing), `sfo.py` (PARAM.SFO)
- Tests: `tests/``test_iso.py`, `test_interface.py` (interface test is currently skipped/broken)
- Tools: `tools/keys2db.py` (builds `libray/data/keys.db` from redump keys), `tools/rpcs3.py` (fetches compat data)
## Commands
- Install deps: `pip install -r requirements.txt`
- Run tests: `python -m unittest discover -b`
- Build sdist + wheel: `python3 setup.py sdist bdist_wheel`
- Publish: `twine upload dist/*`
## Parallelization
- Decrypt and re-encrypt support multiprocessing via `-p`/`--threads` CLI argument
- Default: auto-detects CPU core count via `os.cpu_count()`
- Each sector is independently decrypted (per-sector IV in AES-CBC), making it embarrassingly parallel
- Uses `concurrent.futures.ProcessPoolExecutor` — true multi-core parallelism by spawning OS processes
- Each process gets its own Python interpreter (own GIL), so pycryptodome's partial GIL release is irrelevant
- Unencrypted regions are always copied sequentially (no crypto needed)
- Module-level `_process_sector_chunk_mp()` function is picklable for use with ProcessPoolExecutor; it processes a contiguous run of sectors per task to amortise IPC cost
- A bounded window of chunks is kept in flight (memory stays bounded for large ISOs); results are written back to their absolute offsets in the output file as they complete
## Gotchas
- **Crypto package conflict**: `pycrypto`/`crypto` will break `pycryptodome`. If `ImportError: No module named Crypto.Cipher`, run:
```
pip uninstall crypto pycrypto && pip install pycryptodome
```
- **keys.db is generated**, not committed. Build it with `python3 tools/keys2db.py` (requires keys in `tools/keys/`). It's listed in `.gitignore` via `libray/data/*.db`.
- **`libray/__init__.py`** dynamically imports submodules via `pkgutil.walk_packages` + `importlib.import_module` (skips the `libray` entry-point module to avoid shadowing the package) — don't expect explicit imports.
- **`test_interface.py`** is skipped (`@unittest.skip('currently broken')`) — the interface test won't run.
- `.editorconfig` enforces 4-space indent for `.py`, 2-space for `.yml`/`.yaml`.
- No linting/typechecking config exists — plain `unittest`, no pytest, no pre-commit.

View file

@ -18,18 +18,16 @@
# You should have received a copy of the GNU General Public License
# along with libray. If not, see <https://www.gnu.org/licenses/>.
# This is a script to find the redump names and game serial id's using rpcs3's compatibility list.
# It puts the name, serial id, and some other info into an sqlite3 database.
# That database can then be used to harcode serial id to keys into keys.db.
# This script is not included in the release of libray.
import pkgutil
import importlib
__all__ = []
for loader, module_name, is_pkg in pkgutil.walk_packages(__path__):
# Skip the entry-point script (libray/libray.py) which would pollute
# the package namespace with a 'libray' submodule that shadows the
# package itself when imported via 'importlib.import_module'.
if module_name == 'libray':
continue
__all__.append(module_name)
try:
_module = loader.find_module(module_name).load_module(module_name)
except AttributeError:
_module = loader.find_spec(module_name).loader.load_module(module_name)
_module = importlib.import_module(f'.{module_name}', __name__)
globals()[module_name] = _module

View file

@ -24,8 +24,10 @@ import sys
import stat
import zlib
import shutil
import threading
import requests
from bs4 import BeautifulSoup
from typing import Any, Optional
try:
@ -44,23 +46,25 @@ GET_IRD_NET_LOC = 'http://jonnysp.bplaced.net/ird/'
# Utility functions
def to_int(data, byteorder='big'):
def to_int(data: bytes, byteorder: str = 'big') -> int:
"""Convert bytes to integer"""
if isinstance(data, bytes):
return int.from_bytes(data, byteorder)
return 0
def to_bytes(data):
def to_bytes(data: str) -> Optional[bytes]:
"""Convert a string of HEX to bytes"""
if isinstance(data, str):
return bytes(bytearray.fromhex(data))
return None
ISO_SECRET = to_bytes("380bcf0b53455b3c7817ab4fa3ba90ed")
ISO_IV = to_bytes("69474772af6fdab342743aefaa186287")
def size(path):
def size(path: str) -> int:
"""Get size of a file or block device in bytes"""
pathstat = os.stat(path)
@ -74,7 +78,7 @@ def size(path):
return pathstat.st_size
def read_seven_bit_encoded_int(fileobj, order):
def read_seven_bit_encoded_int(fileobj, order: str) -> int:
"""Read an Int32, 7 bits at a time."""
# The highest bit of the byte, when on, means to continue reading more bytes.
count = 0
@ -90,27 +94,27 @@ def read_seven_bit_encoded_int(fileobj, order):
return count
def error(msg):
def error(msg: str) -> None:
"""Print fatal error message and terminate"""
print('[ERROR] %s' % msg, file=sys.stderr)
print(f'[ERROR] {msg}', file=sys.stderr)
sys.exit(1)
def warning(msg, args):
def warning(msg: str, args: Any) -> None:
"""Print a warning message. Warning messages can be silenced with --quiet"""
if not args.quiet:
print('[WARNING] %s. Continuing regardless.' % msg, file=sys.stderr)
print(f'[WARNING] {msg}. Continuing regardless.', file=sys.stderr)
def vprint(msg, args):
def vprint(msg: str, args: Any) -> None:
"""Vprint, verbose print, can be silenced with --quiet"""
if not args.quiet:
print('[*] ' + msg)
print(f'[*] {msg}')
def download_ird(ird_name):
def download_ird(ird_name: str) -> None:
"""Download an .ird from GET_IRD_NET_LOC"""
# Check if file already exists and skip if it does
@ -118,14 +122,14 @@ def download_ird(ird_name):
return
ird_link = GET_IRD_NET_LOC + ird_name
r = requests.get(ird_link, stream=True)
r = requests.get(ird_link, stream=True, timeout=30)
with open(ird_name, 'wb') as ird_file:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, ird_file)
def ird_by_game_id(game_id):
def ird_by_game_id(game_id: str) -> Optional[str]:
"""Using a game_id, download the responding .ird from ALL_IRD_NET_LOC"""
gameid = game_id.replace('-', '')
try:
@ -148,26 +152,30 @@ def ird_by_game_id(game_id):
return (ird_name)
def crc32(filename, keep_going=[True]):
"""Calculate crc32 for file"""
def crc32(filename: str, cancel: Optional[threading.Event] = None) -> Optional[str]:
"""Calculate crc32 for file.
If *cancel* is provided, the computation can be aborted from another
thread by calling ``cancel.set()``. Returns ``None`` when cancelled.
"""
if cancel is None:
cancel = threading.Event()
with open(filename, 'rb') as infile:
crc32 = 0
while keep_going[0] == True:
crc_val = 0
while not cancel.is_set():
data = infile.read(65536)
if not data:
break
crc32 = zlib.crc32(data, crc32)
crc_val = zlib.crc32(data, crc_val)
if keep_going[0] == False:
if cancel.is_set():
return None
return "%08X" % (crc32 & 0xFFFFFFFF)
return f"{crc_val & 0xFFFFFFFF:08X}"
def serial_country(title):
def serial_country(title: str) -> str:
"""Get country from disc serial / productcode / title_id"""
if title[2] == 'A':
@ -188,7 +196,7 @@ def serial_country(title):
raise ValueError('Unknown country?!')
def multiman_title(title):
def multiman_title(title: str) -> str:
"""Fix special characters in title for Multiman style"""
replace = {
@ -206,7 +214,7 @@ def multiman_title(title):
# Main functions
def info(args):
def info(args: Any) -> None:
"""Print information about .iso and then quit."""
if args.iso:
@ -220,7 +228,7 @@ def info(args):
sys.exit()
def decrypt(args):
def decrypt(args: Any) -> None:
"""Try to decrypt a given .iso using relevant .ird or encryption key from argparse
If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso
@ -232,7 +240,7 @@ def decrypt(args):
input_iso.decrypt(args)
def encrypt(args):
def encrypt(args: Any) -> None:
"""Try to re-encrypt a decrypted .iso using relevant .ird or encryption key from argparse
If no .ird is given this will try to automatically download an .ird file with the encryption/decryption key for the given game .iso

View file

@ -23,6 +23,7 @@ import os
import sys
import zlib
import shutil
import tempfile
try:
@ -47,84 +48,91 @@ class IRD:
"""
ORDER = 'little'
TEMP_FILE = 'ird'
MAGIC_STRING = b'3IRD'
def __init__(self, ird_path, verbose=False):
"""IRD constructor using args from argparse."""
self.uncompress(ird_path) # TODO: Try/Except?
temp_path = self._prepare_temp(ird_path)
self.size = core.size(self.TEMP_FILE)
try:
self.size = core.size(temp_path)
if not self.size:
core.error('IRD file is empty!')
if not self.size:
core.error('IRD file is empty!')
with open(self.TEMP_FILE, 'rb') as input_ird:
if input_ird.read(4) != self.MAGIC_STRING:
core.error('Either not an IRD file, corruped IRD file, or unknown IRD format')
with open(temp_path, 'rb') as input_ird:
if input_ird.read(4) != self.MAGIC_STRING:
core.error('Either not an IRD file, corruped IRD file, or unknown IRD format')
self.version = core.to_int(input_ird.read(1), self.ORDER)
self.game_id = input_ird.read(9)
name_length = core.read_seven_bit_encoded_int(input_ird, self.ORDER)
self.game_name = input_ird.read(name_length).decode('utf8')
self.update_version = input_ird.read(4)
self.game_version = input_ird.read(5)
self.app_version = input_ird.read(5)
self.version = core.to_int(input_ird.read(1), self.ORDER)
self.game_id = input_ird.read(9)
name_length = core.read_seven_bit_encoded_int(input_ird, self.ORDER)
self.game_name = input_ird.read(name_length).decode('utf8')
self.update_version = input_ird.read(4)
self.game_version = input_ird.read(5)
self.app_version = input_ird.read(5)
if self.version == 7:
self.identifier = input_ird.read(4)
if self.version == 7:
self.identifier = input_ird.read(4)
header_length = (core.to_int(input_ird.read(4), self.ORDER))
self.header = input_ird.read(header_length)
footer_length = (core.to_int(input_ird.read(4), self.ORDER))
self.footer = input_ird.read(footer_length)
header_length = (core.to_int(input_ird.read(4), self.ORDER))
self.header = input_ird.read(header_length)
footer_length = (core.to_int(input_ird.read(4), self.ORDER))
self.footer = input_ird.read(footer_length)
self.region_count = core.to_int(input_ird.read(1), self.ORDER)
self.region_hashes = []
for _ in range(0, self.region_count):
self.region_hashes.append(input_ird.read(16))
self.region_count = core.to_int(input_ird.read(1), self.ORDER)
self.region_hashes = []
for _ in range(0, self.region_count):
self.region_hashes.append(input_ird.read(16))
self.file_count = core.to_int(input_ird.read(4), self.ORDER)
self.file_hashes = []
for _ in range(0, self.file_count):
key = core.to_int(input_ird.read(8), self.ORDER)
val = input_ird.read(16)
self.file_hashes.append({'key': key, 'val': val})
self.file_count = core.to_int(input_ird.read(4), self.ORDER)
self.file_hashes = []
for _ in range(0, self.file_count):
key = core.to_int(input_ird.read(8), self.ORDER)
val = input_ird.read(16)
self.file_hashes.append({'key': key, 'val': val})
if self.version >= 9:
self.pic = input_ird.read(115)
if self.version >= 9:
self.pic = input_ird.read(115)
input_ird.seek(input_ird.tell() + 4) # ?
input_ird.seek(input_ird.tell() + 4) # ?
self.data1 = input_ird.read(16)
self.data2 = input_ird.read(16)
self.data1 = input_ird.read(16)
self.data2 = input_ird.read(16)
if self.version < 9:
self.pic = input_ird.read(115)
if self.version < 9:
self.pic = input_ird.read(115)
if self.version < 7:
self.uid = core.to_int(input_ird.read(4), self.ORDER)
if self.version < 7:
self.uid = core.to_int(input_ird.read(4), self.ORDER)
if verbose:
self.print_info()
if verbose:
self.print_info()
finally:
os.remove(temp_path)
os.remove(self.TEMP_FILE)
def _prepare_temp(self, filename):
"""Decompress (if needed) and copy an IRD file to a unique temp file.
def uncompress(self, filename):
"""Uncompress IRD. Assumes given .ird file is not compressed, but then tries to decompress it with zlib/gzfile if it was not uncompressed."""
Returns the path to the temp file. The caller is responsible for
deleting it (use a ``try / finally`` block).
"""
with open(filename, 'rb') as f:
magic = f.read(4)
uncompress = False
with open(filename, 'rb') as input_ird:
if input_ird.read(4) != self.MAGIC_STRING:
uncompress = True
fd, tmp_path = tempfile.mkstemp(suffix='.ird')
os.close(fd)
if uncompress:
with open(filename, 'rb') as gzfile:
with open(self.TEMP_FILE, 'wb') as tmpfile:
tmpfile.write(zlib.decompress(gzfile.read(), zlib.MAX_WBITS | 16))
if magic != self.MAGIC_STRING:
# Compressed — decompress to the temp file
with open(filename, 'rb') as gzfile, open(tmp_path, 'wb') as tmpfile:
tmpfile.write(zlib.decompress(gzfile.read(), zlib.MAX_WBITS | 16))
else:
shutil.copyfile(filename, self.TEMP_FILE)
# Uncompressed — copy to the temp file
shutil.copyfile(filename, tmp_path)
return tmp_path
def print_info(self, regions=False):
# TODO: This could probably have been a __str__? Who cares?

View file

@ -22,10 +22,11 @@
import os
import sys
import sqlite3
import pathlib
import concurrent.futures
import threading
from collections import deque
from threading import Thread
import time
import pkg_resources
from importlib import resources
from tqdm import tqdm
from Crypto.Cipher import AES
@ -40,6 +41,34 @@ except ImportError:
import sfo
def _make_iv(sector_number):
"""Build a 16-byte IV from a sector number (little-endian)."""
iv = bytearray(16)
num = sector_number
for j in range(16):
iv[15 - j] = num & 0xFF
num >>= 8
return bytes(iv)
def _process_sector_chunk_mp(args):
"""Multiprocessing worker: (de/en)crypt a contiguous run of sectors.
Picklable, module-level function for use with ProcessPoolExecutor. Takes a
single ``bytes`` blob of one or more whole sectors and returns the processed
blob, so only one (large) object is pickled per task instead of many small
ones this is what makes the multiprocessing worthwhile.
"""
disc_key, sector_blob, sector_start_number, encrypt_mode = args
out = bytearray()
for offset in range(0, len(sector_blob), core.SECTOR):
sector = sector_blob[offset:offset + core.SECTOR]
iv = _make_iv(sector_start_number + offset // core.SECTOR)
cipher = AES.new(disc_key, AES.MODE_CBC, iv)
out += cipher.encrypt(sector) if encrypt_mode else cipher.decrypt(sector)
return bytes(out)
class ISO:
"""Class for handling PS3 .iso files.
@ -159,7 +188,7 @@ class ISO:
if not args.output:
args.output = f'{game_title} [{param["TITLE_ID"]}].iso'
except Exception:
except (UnicodeDecodeError, KeyError, IndexError, ValueError):
core.warning('Failed reading SFO', args)
self.disc_key = self.get_key_from_args(game_title, args)
@ -168,125 +197,125 @@ class ISO:
def decrypt(self, args):
"""Decrypt self using args from argparse."""
core.vprint(f'Decrypting with disc key: {self.disc_key.hex()}', args)
with open(args.iso, 'rb') as input_iso:
if not args.output:
output_name = f'{self.game_id}.iso'
else:
output_name = args.output
core.vprint(f'Decrypted .iso is output to: {output_name}', args)
with open(output_name, 'wb') as output_iso:
if not args.quiet:
pbar = tqdm(total=(self.size // 2048))
for region in self.regions:
input_iso.seek(region['start'])
# Unencrypted region, just copy it
if not region['enc']:
while input_iso.tell() < region['end']:
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
output_iso.write(data)
if not args.quiet:
pbar.update(1)
continue
# Encrypted region, decrypt then write
else:
while input_iso.tell() < region['end']:
num = input_iso.tell() // 2048
iv = bytearray([0 for i in range(0, 16)])
for j in range(0, 16):
iv[16 - j - 1] = (num & 0xFF)
num >>= 8
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv))
decrypted = cipher.decrypt(data)
output_iso.write(decrypted)
if not args.quiet:
pbar.update(1)
if not args.quiet:
pbar.close()
core.vprint('Decryption complete!', args)
self._process_iso(args, encrypt_mode=False)
def encrypt(self, args):
"""Encrypt self using args from argparse."""
self._process_iso(args, encrypt_mode=True)
core.vprint(f'Re-encrypting with disc key: {self.disc_key.hex()}', args)
def _process_iso(self, args, encrypt_mode):
"""Shared driver for decrypt/encrypt. ``encrypt_mode`` selects direction."""
with open(args.iso, 'rb') as input_iso:
core.vprint(f'{"Re-encrypting" if encrypt_mode else "Decrypting"} with disc key: {self.disc_key.hex()}', args)
if not args.output:
output_name = f'{self.game_id}_e.iso'
else:
output_name = args.output
num_workers = args.threads if args.threads and args.threads > 0 else (os.cpu_count() or 1)
core.vprint(f'Re-encrypted .iso is output to: {output_name}', args)
if args.output:
output_name = args.output
else:
output_name = f'{self.game_id}_e.iso' if encrypt_mode else f'{self.game_id}.iso'
with open(output_name, 'wb') as output_iso:
core.vprint(f'{"Re-encrypted" if encrypt_mode else "Decrypted"} .iso is output to: {output_name}', args)
if not args.quiet:
pbar = tqdm(total=(self.size // 2048))
total_sectors = self.size // core.SECTOR
pbar = tqdm(total=total_sectors, file=sys.stdout, disable=args.quiet, leave=True)
for region in self.regions:
if num_workers > 1:
core.vprint(f'Using {num_workers} processes for parallel {"re-encryption" if encrypt_mode else "decryption"}', args)
self._process_parallel(args, output_name, encrypt_mode, num_workers, pbar)
else:
self._process_sequential(args, output_name, encrypt_mode, pbar)
pbar.close()
core.vprint(f'{"Re-encryption" if encrypt_mode else "Decryption"} complete!', args)
def _process_sequential(self, args, output_name, encrypt_mode, pbar):
"""Single-process path: read, optionally (de/en)crypt, and write per sector."""
with open(args.iso, 'rb') as input_iso, open(output_name, 'wb') as output_file:
for region in self.regions:
region_sectors = (region['end'] - region['start']) // core.SECTOR
base_sector = region['start'] // core.SECTOR
input_iso.seek(region['start'])
for i in range(region_sectors):
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
if region['enc']:
cipher = AES.new(self.disc_key, AES.MODE_CBC, _make_iv(base_sector + i))
data = cipher.encrypt(data) if encrypt_mode else cipher.decrypt(data)
output_file.write(data)
pbar.update(1)
def _process_parallel(self, args, output_name, encrypt_mode, num_workers, pbar):
"""Multi-process path: one process pool for the whole operation.
The output file is pre-sized so every region can be written to its
absolute offset; this also clears any stale file from a previous run.
"""
# Pre-create the output at its final size and write to absolute offsets.
total_bytes = self.regions[-1]['end']
with open(output_name, 'wb') as f:
f.truncate(total_bytes)
with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor, \
open(args.iso, 'rb') as input_iso, \
open(output_name, 'r+b') as out:
for region in self.regions:
if region['enc']:
self._process_region_parallel(input_iso, out, region, executor, num_workers, encrypt_mode, pbar)
else:
# Unencrypted region — copy straight through to its offset.
region_sectors = (region['end'] - region['start']) // core.SECTOR
input_iso.seek(region['start'])
out.seek(region['start'])
for _ in range(region_sectors):
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
out.write(data)
pbar.update(1)
# Unencrypted region, just copy it
if not region['enc']:
while input_iso.tell() < region['end']:
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
output_iso.write(data)
def _process_region_parallel(self, input_iso, out, region, executor, num_workers, encrypt_mode, pbar):
"""(De/en)crypt one encrypted region across worker processes.
if not args.quiet:
pbar.update(1)
continue
# Decrypted region, re-encrypt it
else:
while input_iso.tell() < region['end']:
num = input_iso.tell() // 2048
iv = bytearray([0 for i in range(0, 16)])
for j in range(0, 16):
iv[16 - j - 1] = (num & 0xFF)
num >>= 8
Reads the region in bounded-size chunks and keeps only a small window of
chunks in flight, so peak memory stays bounded even for huge ISOs.
Results are written back to their absolute offsets as they complete.
"""
num_sectors = (region['end'] - region['start']) // core.SECTOR
base_sector = region['start'] // core.SECTOR
data = input_iso.read(core.SECTOR)
if not data:
core.warning('Trying to read past the end of the file', args)
break
# Sectors per task: large enough to amortise IPC, small enough to keep
# every worker busy and bound peak memory.
sectors_per_chunk = max(1, min(512, (num_sectors // (num_workers * 4)) or 1))
max_in_flight = num_workers * 4
cipher = AES.new(self.disc_key, AES.MODE_CBC, bytes(iv))
encrypted = cipher.encrypt(data)
input_iso.seek(region['start'])
next_sector = base_sector
sectors_left = num_sectors
pending = deque()
output_iso.write(encrypted)
def submit_more():
nonlocal next_sector, sectors_left
while sectors_left > 0 and len(pending) < max_in_flight:
count = min(sectors_per_chunk, sectors_left)
blob = input_iso.read(count * core.SECTOR)
future = executor.submit(_process_sector_chunk_mp,
(self.disc_key, blob, next_sector, encrypt_mode))
pending.append((next_sector, count, future))
next_sector += count
sectors_left -= count
if not args.quiet:
pbar.update(1)
if not args.quiet:
pbar.close()
core.vprint('Re-encryption complete!', args)
submit_more()
while pending:
start_sector, count, future = pending.popleft()
out.seek(start_sector * core.SECTOR)
out.write(future.result())
pbar.update(count)
submit_more()
def get_key_from_args(self, game_title, args):
# key provided with -d / --decryption-key
@ -312,16 +341,17 @@ class ISO:
# No key or .ird specified. Let's first check if keys.db is packaged with this release
core.vprint('Checking for bundled redump keys', args)
try:
db = sqlite3.connect(pkg_resources.resource_filename(__name__, 'data/keys.db'))
except FileNotFoundError:
db = sqlite3.connect((pathlib.Path(__file__).resolve() / 'data/') / 'keys.db')
db_path = resources.files(__name__).joinpath('data', 'keys.db')
db = sqlite3.connect(str(db_path))
c = db.cursor()
# UPDATE: 2024 - New database now has game/title ids. See if we have that.
core.vprint('Searching using TITLE_ID', args)
keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-','')]).fetchall()
try:
keys = c.execute('SELECT name, key FROM games WHERE title_id = ?', [self.game_id.replace('-', '')]).fetchall()
except sqlite3.OperationalError:
core.error('keys.db not found or invalid. Build it with: python3 tools/keys2db.py')
if len(keys) == 1:
core.vprint(f'Found potential redump key: "{keys[0][0]}"', args)
return keys[0][1]
@ -336,10 +366,15 @@ class ISO:
# If not, see if we can filter it out based on name and size
core.vprint('Trying to find redump key based on size, game title, and country', args)
if not game_title:
raise ValueError
core.error('Could not determine game title from PARAM.SFO. Specify a decryption key with -d or provide an IRD file with -k.')
try:
country = core.serial_country(self.game_id)
except ValueError:
core.error(f'Unknown country code in game ID "{self.game_id}". Specify a decryption key with -d or provide an IRD file with -k.')
keys = c.execute('SELECT name, key FROM games WHERE lower(name) LIKE ? AND size = ?', [
'%' + '%'.join(game_title.lower().split(' ')) + '%' + core.serial_country(self.game_id).lower() + '%', str(self.size)]).fetchall()
'%' + '%'.join(game_title.lower().split(' ')) + '%' + country.lower() + '%', str(self.size)]).fetchall()
if keys:
core.vprint(f'Found potential redump key: "{keys[0][0]}"', args)
return keys[0][1]
@ -351,35 +386,29 @@ class ISO:
# Okay, searching has failed us, but maaaybe the checksum works?
core.vprint('Trying to find redump key based on CRC32', args)
crc32 = None
crc32_continue = [True]
cancel = threading.Event()
crc_done = threading.Event()
if args.checksum_timeout > 0:
def timeout(allow_execution):
time.sleep(float(args.checksum_timeout))
if crc32 is None:
def timeout():
# Abort the CRC32 calculation if it hasn't finished in time.
if not crc_done.wait(timeout=float(args.checksum_timeout)):
core.vprint(f'could not calculate CRC32 before {args.checksum_timeout}-second timeout', args)
allow_execution[0] = False
crc_thread = Thread(target=timeout, args=(crc32_continue,), daemon=True)
cancel.set()
crc_thread = Thread(target=timeout, daemon=True)
crc_thread.start()
crc32 = core.crc32(args.iso, crc32_continue)
if crc32 is None:
calculated_crc = core.crc32(args.iso, cancel)
crc_done.set()
if calculated_crc is None:
raise TimeoutError
keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [crc32.lower()]).fetchall()
keys = c.execute('SELECT name, key FROM games WHERE crc32=?', [calculated_crc.lower()]).fetchall()
if len(keys) == 1:
core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={crc32.lower()})', args)
core.vprint(f'Found potential redump key: "{keys[0][0]}" (CRC32={calculated_crc.lower()})', args)
return keys[0][1]
# Fallback to downloading an IRD from the internet (currently disabled)
# try:
# core.warning('No IRD file specified, finding required file', args)
# args.ird = core.ird_by_game_id(self.game_id) # Download ird
# return get_key_from_ird(args.ird)
# except:
# core.vprint('Could not download IRD file', args)
raise ValueError
core.error('could not find disc key')
def print_info(self):
# TODO: This could probably have been a __str__? Who cares?

View file

@ -1,63 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf8 -*-
# libray - Libre Blu-Ray PS3 ISO Tool
# Copyright © 2018 - 2024 Nichlas Severinsen
#
# This file is part of libray.
#
# libray is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# libray is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with libray. If not, see <https://www.gnu.org/licenses/>.
import argparse
try:
from libray import core
except ImportError:
import core
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs')
required = parser.add_mutually_exclusive_group(required=True)
required.add_argument('-i', '--iso', dest='iso', type=str, help='Path to .iso file or stream')
required.add_argument('-k', '--ird', dest='ird', type=str, help='Path to .ird file', default='')
optional = parser.add_argument_group('optional arguments')
optional.add_argument('-o', '--output', dest='output', type=str, help='Output filename', default='')
optional.add_argument('-d', '--decryption-key', dest='decryption_key', type=str, help='Manually specify key', default='')
optional.add_argument('-v', '--verbose', dest='verbose', help='Increase verbosity', action='count')
optional.add_argument('-q', '--quiet', dest='quiet', help='Quiet mode, only prints on error', action='store_true')
# -e is reserved for "extract" so re-encrypt is "-r"
optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true')
optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true')
optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15)
optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.')
args = parser.parse_args()
if args.info:
core.info(args)
if not args.iso:
core.error('No .iso file given. Use -i/--iso path/to/file.iso')
if args.reencrypt:
core.encrypt(args)
else:
core.decrypt(args)

1
libray/libray Symbolic link
View file

@ -0,0 +1 @@
libray.py

View file

@ -19,8 +19,15 @@
# You should have received a copy of the GNU General Public License
# along with libray. If not, see <https://www.gnu.org/licenses/>.
import multiprocessing
try:
multiprocessing.set_start_method('spawn')
except RuntimeError:
pass
import argparse
import sys
try:
@ -29,8 +36,7 @@ except ImportError:
import core
if __name__ == '__main__':
def main():
parser = argparse.ArgumentParser(
description='A Libre (FLOSS) Python application for unencrypting, extracting, repackaging, and encrypting PS3 ISOs')
@ -47,6 +53,7 @@ if __name__ == '__main__':
optional.add_argument('-r', '--re-encrypt', dest='reencrypt', help='Re-encrypt .iso', action='store_true')
optional.add_argument('-c', '--checksum', dest='checksum', help='Allow fallback to CRC32 checksum (disabled by default)', action='store_true')
optional.add_argument('-t', '--checksum-timeout', dest='checksum_timeout', type=int, help='How many seconds to wait for CRC32 checksum (default 15)', default=15)
optional.add_argument('-p', '--threads', dest='threads', type=int, help='Number of threads for parallel decryption/encryption (default: number of CPU cores)', default=0)
optional.add_argument('--info', dest='info', action='store_true', help='Print info about .iso or .ird, then quit.')
args = parser.parse_args()
@ -61,3 +68,7 @@ if __name__ == '__main__':
core.encrypt(args)
else:
core.decrypt(args)
if __name__ == '__main__':
main()

0
libray/py.typed Normal file
View file

View file

@ -2,5 +2,3 @@ tqdm~=4.66.2
pycryptodome~=3.20.0
requests~=2.31.0
beautifulsoup4~=4.12.3
html5lib~=1.1
setuptools~=69.1.1

View file

@ -19,14 +19,16 @@ setup(
author_email="ns@nsz.no",
url="https://notabug.org/necklace/libray",
packages=['libray'],
scripts=['libray/libray'],
entry_points={
'console_scripts': [
'libray=libray.libray:main',
],
},
install_requires=[
'tqdm~=4.66.2',
'pycryptodome~=3.20.0',
'requests~=2.31.0',
'beautifulsoup4~=4.12.3',
'html5lib~=1.1',
'setuptools~=69.1.1',
],
include_package_data=True,
package_data={'': ['data/keys.db']},

1
target.md5 Normal file
View file

@ -0,0 +1 @@
c16539e7abd6fe851ab2a67c0cbbaa32

View file

@ -24,6 +24,7 @@
import argparse
import os
import unittest
import unittest.mock as mock
from Crypto.Cipher import AES
@ -62,7 +63,7 @@ class TestISO(unittest.TestCase):
{'start': 1024, 'end': 2048, 'enc': False}
]
with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird:
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
mock_ird.assert_called_once_with('aaa.ird')
self.assertEqual(decryptkey_bytes, returned_key)
@ -84,7 +85,7 @@ class TestISO(unittest.TestCase):
{'start': 0, 'end': 512, 'enc': False},
]
with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird:
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@ -107,7 +108,7 @@ class TestISO(unittest.TestCase):
{'start': 2000000000, 'end': 2000001000, 'enc': False}
]
with mock.patch('iso.ird.IRD', return_value=ird) as mock_ird:
with mock.patch('libray.iso.ird.IRD', return_value=ird) as mock_ird:
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@ -122,7 +123,7 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes]]
@ -140,7 +141,7 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.side_effect = [[], [['AAA', decryptkey_bytes]]]
@ -158,11 +159,11 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
mocksql.connect().cursor().execute().fetchall.return_value = [['AAA', decryptkey_bytes],['BBB', decryptkey_bytes]]
with self.assertRaises(ValueError):
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args(None, mock_args)
@mock.patch('argparse.ArgumentParser.parse_args', return_value=argparse.Namespace())
@ -177,7 +178,7 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
mocksql.connect().cursor().execute().fetchall.return_value = []
with self.assertRaises(SystemExit):
fake_iso.get_key_from_args('AAA', mock_args)
@ -195,12 +196,12 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
decryption_key = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
decryptkey_bytes = core.to_bytes(decryption_key)
fakeresults = ([], [], [['AAA', decryptkey_bytes]])
mocksql.connect().cursor().execute().fetchall.side_effect = fakeresults
with mock.patch('iso.core.crc32', return_value='01010101'):
with mock.patch('libray.iso.core.crc32', return_value='01010101'):
returned_key = fake_iso.get_key_from_args('AAA', mock_args)
self.assertEqual(decryptkey_bytes, returned_key)
@ -217,10 +218,177 @@ class TestISO(unittest.TestCase):
fake_iso.size = 512 * 1024 * 1024
fake_iso.game_id = 'TCUS-12345'
with mock.patch('iso.sqlite3') as mocksql:
with mock.patch('libray.iso.sqlite3') as mocksql:
mocksql.connect().cursor().execute().fetchall.return_value = []
with mock.patch('iso.core.crc32', return_value=None):
with mock.patch('libray.iso.core.crc32', return_value=None):
with self.assertRaises(TimeoutError):
fake_iso.get_key_from_args('AAA', mock_args)
def _build_fake_iso(self, num_sectors=8, disc_key=None):
"""Build a minimal fake ISO binary in a temp file.
Layout:
[0:4] num_unencrypted_regions (uint32 BE = 1)
[4:8] padding (4 bytes, skipped by constructor)
[8:12] unencrypted region start sector (uint32 BE = 0)
[12:16] unencrypted region end sector (uint32 BE = 2)
[16:20] encrypted region end sector (uint32 BE = 6)
[20:24] terminator (0)
[2048:2064] game_id (16 bytes)
[2064:2072] PARAM.SFO marker (b'\\x00PSF')
Sectors 0-1 = unencrypted header
Sectors 2-5 = encrypted payload (zeros)
Sector 6+ = unencrypted tail
"""
import tempfile
if disc_key is None:
disc_key = b'\xaa' * 16
# Region info: constructor reads bytes 0-3, skips 4-7, then
# read_regions() reads bytes 8-11 (start) and 12-15 (end)
region_info = b'\x00\x00\x00\x01' # num_unencrypted_regions (offset 0)
region_info += b'\x00\x00\x00\x00' # padding (offset 4, skipped)
region_info += b'\x00\x00\x00\x00' # unencrypted start = sector 0 (offset 8)
region_info += b'\x00\x00\x00\x02' # unencrypted end = sector 2 (offset 12)
region_info += b'\x00\x00\x00\x06' # encrypted end = sector 6 (offset 16)
region_info += b'\x00\x00\x00\x00' # terminator (offset 20)
# Sector 1 (offset 2048): unencrypted header with game_id and PARAM.SFO marker
sector1 = bytearray(2048)
# game_id at offset 2048+16=2064
sector1[16:32] = b'TEST-GAME0000000'
# PARAM.SFO marker at offset 2048+20=2068
sector1[20:24] = b'\x00PSF'
# Encrypt sectors 2-5 with AES-CBC per-sector IV
from libray.iso import _make_iv
encrypted_payload = bytearray()
for s in range(2, 6):
iv = _make_iv(s)
cipher = AES.new(disc_key, AES.MODE_CBC, iv)
encrypted_payload.extend(cipher.encrypt(b'\x00' * 2048))
# Pad so sector data starts at offset 2048 (PS3 ISO convention)
data = region_info + b'\x00' * (2048 - len(region_info)) + sector1 + encrypted_payload
# Ensure file is at least as large as the last region end (sector 6 = 12288 bytes)
# The constructor uses size to calculate total_sectors, so file must be >= region end
while len(data) < 12288:
data += b'\x00' * 2048
total = len(data)
fd, path = tempfile.mkstemp(suffix='.iso')
with os.fdopen(fd, 'wb') as f:
f.write(data)
return path, total
@mock.patch('argparse.ArgumentParser.parse_args')
def test_decrypt_parallel_pipeline(self, mock_args):
"""Verify the multi-threaded decrypt pipeline processes all sectors."""
disc_key = b'\xbb' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = ''
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 4
out_path = path + '.dec'
mock_args.output = out_path
s = iso.ISO(mock_args)
s.decrypt(mock_args)
self.assertTrue(os.path.exists(out_path))
with open(out_path, 'rb') as f:
decrypted = f.read()
# Sectors 2-5 should be decrypted back to zeros
self.assertEqual(len(decrypted), size)
# Encrypted region: start=6144 (region[0].end), end=12288
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
@mock.patch('argparse.ArgumentParser.parse_args')
def test_decrypt_sequential_fallback(self, mock_args):
"""Verify single-thread decrypt still works (sequential path)."""
disc_key = b'\xcc' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = ''
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 1
out_path = path + '.dec'
mock_args.output = out_path
s = iso.ISO(mock_args)
s.decrypt(mock_args)
self.assertTrue(os.path.exists(out_path))
with open(out_path, 'rb') as f:
decrypted = f.read()
self.assertEqual(decrypted[6144:12288], b'\x00' * 6144)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
@mock.patch('argparse.ArgumentParser.parse_args')
def test_encrypt_parallel_pipeline(self, mock_args):
"""Verify the multi-threaded re-encrypt pipeline works."""
disc_key = b'\xdd' * 16
path, size = self._build_fake_iso(disc_key=disc_key)
try:
# First decrypt to get a plain ISO
mock_args.iso = path
mock_args.decryption_key = disc_key.hex()
mock_args.ird = ''
mock_args.output = path + '.dec'
mock_args.verbose = True
mock_args.quiet = True
mock_args.checksum = False
mock_args.checksum_timeout = 0
mock_args.threads = 4
s = iso.ISO(mock_args)
s.decrypt(mock_args)
# Now re-encrypt with -r
reenc_path = path + '.reenc'
mock_args.iso = path + '.dec'
mock_args.decryption_key = disc_key.hex()
mock_args.reencrypt = True
mock_args.output = reenc_path
mock_args.threads = 4
s2 = iso.ISO(mock_args)
s2.encrypt(mock_args)
self.assertTrue(os.path.exists(reenc_path))
with open(reenc_path, 'rb') as f:
reencrypted = f.read()
with open(path, 'rb') as f:
original = f.read()
self.assertEqual(reencrypted, original)
finally:
os.unlink(path)
if os.path.exists(path + '.dec'):
os.unlink(path + '.dec')
if os.path.exists(path + '.reenc'):
os.unlink(path + '.reenc')