diff --git a/apt/utils/models/__init__.py b/apt/utils/models/__init__.py index ff582b4..202ec90 100644 --- a/apt/utils/models/__init__.py +++ b/apt/utils/models/__init__.py @@ -1,6 +1,6 @@ from apt.utils.models.model import Model, BlackboxClassifier, ModelOutputType, ScoringMethod, \ BlackboxClassifierPredictions, BlackboxClassifierPredictFunction, get_nb_classes, is_one_hot, \ - check_correct_model_output + check_correct_model_output, is_multi_label, is_multi_label_binary from apt.utils.models.sklearn_model import SklearnModel, SklearnClassifier, SklearnRegressor from apt.utils.models.keras_model import KerasClassifier, KerasRegressor from apt.utils.models.xgboost_model import XGBoostClassifier diff --git a/apt/utils/models/pytorch_model.py b/apt/utils/models/pytorch_model.py index f234311..a7db4a9 100644 --- a/apt/utils/models/pytorch_model.py +++ b/apt/utils/models/pytorch_model.py @@ -3,14 +3,14 @@ import os import shutil import logging -from typing import Optional, Tuple +from typing import Optional, Tuple, Union, List, Callable import numpy as np import torch from torch.utils.data import DataLoader, TensorDataset from art.utils import check_and_transform_label_format from apt.utils.datasets.datasets import PytorchData -from apt.utils.models import Model, ModelOutputType +from apt.utils.models import Model, ModelOutputType, is_multi_label, is_multi_label_binary from apt.utils.datasets import OUTPUT_DATA_ARRAY_TYPE from art.estimators.classification.pytorch import PyTorchClassifier as ArtPyTorchClassifier @@ -30,16 +30,45 @@ class PyTorchClassifierWrapper(ArtPyTorchClassifier): Extension for Pytorch ART model """ + def __init__( + self, + model: "torch.nn.Module", + loss: "torch.nn.modules.loss._Loss", + input_shape: Tuple[int, ...], + nb_classes: int, + optimizer: Optional["torch.optim.Optimizer"] = None, # type: ignore + use_amp: bool = False, + opt_level: str = "O1", + loss_scale: Optional[Union[float, str]] = "dynamic", + channels_first: bool = True, + clip_values: Optional["CLIP_VALUES_TYPE"] = None, + preprocessing_defences: Union["Preprocessor", List["Preprocessor"], None] = None, + postprocessing_defences: Union["Postprocessor", List["Postprocessor"], None] = None, + preprocessing: "PREPROCESSING_TYPE" = (0.0, 1.0), + device_type: str = "gpu", + ): + super().__init__(model, loss, input_shape, nb_classes, optimizer, use_amp, opt_level, loss_scale, + channels_first, clip_values, preprocessing_defences, postprocessing_defences, preprocessing, + device_type) + self._is_single_binary = None + self._is_multi_label = None + self._is_multi_label_binary = None + def get_step_correct(self, outputs, targets) -> int: """ Get number of correctly classified labels. """ + # here everything is torch tensors if len(outputs) != len(targets): raise ValueError("outputs and targets should be the same length.") - if self.nb_classes > 1: - return int(torch.sum(torch.argmax(outputs, axis=-1) == targets).item()) + if self._is_single_binary: + return int(torch.sum(torch.round(outputs) == targets).item()) + elif self._is_multi_label: + if self._is_multi_label_binary: + outputs = torch.round(outputs) + return int(torch.sum(targets == outputs).item()) else: - return int(torch.sum(torch.round(outputs, axis=-1) == targets).item()) + return int(torch.sum(torch.argmax(outputs, axis=-1) == targets).item()) def _eval(self, loader: DataLoader): """ @@ -93,6 +122,10 @@ class PyTorchClassifierWrapper(ArtPyTorchClassifier): :param kwargs: Dictionary of framework-specific arguments. This parameter is not currently supported for PyTorch and providing it takes no effect. """ + self._is_single_binary = self.nb_classes == 2 and (len(y.shape) == 1 or y.shape[1] == 1) + self._is_multi_label = is_multi_label(y) + self._is_multi_label_binary = is_multi_label_binary(y) + # Put the model in the training mode self._model.train() @@ -400,24 +433,43 @@ class PyTorchClassifier(PyTorchModel): """ return self._art_model.predict(x.get_samples(), **kwargs) - def score(self, test_data: PytorchData, **kwargs): + def score(self, test_data: PytorchData, binary_threshold: Optional[float] = 0.5, + apply_non_linearity: Optional[Callable] = None, **kwargs): """ Score the model using test data. :param test_data: Test data. :type test_data: `PytorchData` + :param binary_threshold: The threshold to use on binary classification probabilities to assign the positive + class. + :type binary_threshold: float, optional. Default is 0.5. + :param apply_non_linearity: A non-linear function to apply to the result of the 'predict' method, in case the + model outputs logits (e.g., sigmoid). + :type apply_non_linearity: Callable, should be possible to apply directly to the numpy output of the 'predict' + method, optional. :return: the score as float (between 0 and 1) """ + # numpy arrays y = test_data.get_labels() predicted = self.predict(test_data) + if apply_non_linearity: + predicted = apply_non_linearity(predicted) # binary classification, single column of probabilities if self._art_model.nb_classes == 2 and (len(predicted.shape) == 1 or predicted.shape[1] == 1): if len(predicted.shape) > 1: y = check_and_transform_label_format(y, self._art_model.nb_classes, return_one_hot=False) - return np.count_nonzero(y == (predicted > 0.5)) / predicted.shape[0] + return np.count_nonzero(y == (predicted > binary_threshold)) / predicted.shape[0] + # multi column else: - y = check_and_transform_label_format(y, self._art_model.nb_classes) - return np.count_nonzero(np.argmax(y, axis=1) == np.argmax(predicted, axis=1)) / predicted.shape[0] + if not is_multi_label(y): + y = check_and_transform_label_format(y, self._art_model.nb_classes) + return np.count_nonzero(np.argmax(y, axis=1) == np.argmax(predicted, axis=1)) / predicted.shape[0] + else: + if is_multi_label_binary(y): + predicted[predicted < binary_threshold] = 0 + predicted[predicted >= binary_threshold] = 1 + return np.count_nonzero(y == predicted) / (predicted.shape[0] * y.shape[1]) + def load_checkpoint_state_dict_by_path(self, model_name: str, path: str = None): """ diff --git a/tests/test_model.py b/tests/test_model.py index b7b2909..20f2ba9 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -179,7 +179,7 @@ def test_blackbox_classifier_predictions_multi_label_cat(): def test_blackbox_classifier_predictions_multi_label_binary(): (x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np() - # make multi-label categorical + # make multi-label binary y_train = np.column_stack((y_train, y_train, y_train)) y_train[y_train > 1] = 1 pred_train = y_train.copy().astype(float) diff --git a/tests/test_pytorch.py b/tests/test_pytorch.py index c16735d..c138a2d 100644 --- a/tests/test_pytorch.py +++ b/tests/test_pytorch.py @@ -1,10 +1,14 @@ import numpy as np -from torch import nn, optim +from torch import nn, optim, sigmoid, where +from torch.nn import functional +# from torch.utils.data import DataLoader, TensorDataset +from scipy.special import expit from apt.utils.datasets.datasets import PytorchData from apt.utils.models import ModelOutputType from apt.utils.models.pytorch_model import PyTorchClassifier from art.utils import load_nursery +from apt.utils import dataset_utils class pytorch_model(nn.Module): @@ -39,7 +43,7 @@ class pytorch_model(nn.Module): return self.classifier(out) -def test_nursery_pytorch_state_dict(): +def test_pytorch_nursery_state_dict(): (x_train, y_train), (x_test, y_test), _, _ = load_nursery(test_set=0.5) # reduce size of training set to make attack slightly better train_set_size = 500 @@ -67,7 +71,7 @@ def test_nursery_pytorch_state_dict(): assert (0 <= score <= 1) -def test_nursery_pytorch_save_entire_model(): +def test_pytorch_nursery_save_entire_model(): (x_train, y_train), (x_test, y_test), _, _ = load_nursery(test_set=0.5) # reduce size of training set to make attack slightly better @@ -94,3 +98,126 @@ def test_nursery_pytorch_save_entire_model(): score = art_model.score(PytorchData(x_test.astype(np.float32), y_test)) print('best model accuracy: ', score) assert (0 <= score <= 1) + + +# def test_pytorch_predictions_multi_label_cat(): +# # This kind of model requires special training and will not be supported using the 'fit' method. +# class multi_label_cat_model(nn.Module): +# +# def __init__(self, num_classes, num_features): +# super(multi_label_cat_model, self).__init__() +# +# self.fc1 = nn.Sequential( +# nn.Linear(num_features, 256), +# nn.Tanh(), ) +# +# self.classifier1 = nn.Linear(256, num_classes) +# self.classifier2 = nn.Linear(256, num_classes) +# self.classifier3 = nn.Linear(256, num_classes) +# +# def forward(self, x): +# out1 = self.classifier1(self.fc1(x)) +# out2 = self.classifier2(self.fc1(x)) +# out3 = self.classifier3(self.fc1(x)) +# return out1, out2, out3 +# +# (x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np() +# +# # make multi-label categorical +# y_train = np.column_stack((y_train, y_train, y_train)) +# y_test = np.column_stack((y_test, y_test, y_test)) +# test = PytorchData(x_test, y_test) +# +# model = multi_label_cat_model(3, 4) +# criterion = nn.CrossEntropyLoss() +# optimizer = optim.Adam(model.parameters(), lr=0.01) +# +# # train model +# train_dataset = TensorDataset(from_numpy(x_train.astype(np.float32)), from_numpy(y_train.astype(np.float32))) +# train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True) +# +# for epoch in range(5): +# # Train for one epoch +# for inputs, targets in train_loader: +# # Zero the parameter gradients +# optimizer.zero_grad() +# +# # Perform prediction +# model_outputs = model(inputs)[-1] +# +# # Form the loss function +# loss = 0 +# for i, o in enumerate(model_outputs): +# loss += criterion(o, targets[i]) +# +# loss.backward() +# +# optimizer.step() +# +# art_model = PyTorchClassifier(model=model, output_type=ModelOutputType.CLASSIFIER_LOGITS, loss=criterion, +# optimizer=optimizer, input_shape=(24,), +# nb_classes=3) +# +# pred = art_model.predict(test) +# assert (pred.shape[0] == x_test.shape[0]) +# +# score = art_model.score(test, apply_non_linearity=expit) +# assert (score == 1.0) + + +def test_pytorch_predictions_multi_label_binary(): + class multi_label_binary_model(nn.Module): + def __init__(self, num_labels, num_features): + super(multi_label_binary_model, self).__init__() + + self.fc1 = nn.Sequential( + nn.Linear(num_features, 256), + nn.Tanh(), ) + + self.classifier1 = nn.Linear(256, num_labels) + + def forward(self, x): + return self.classifier1(self.fc1(x)) + # missing sigmoid on each output + + class FocalLoss(nn.Module): + def __init__(self, gamma=2, alpha=0.5): + super(FocalLoss, self).__init__() + self.gamma = gamma + self.alpha = alpha + + def forward(self, input, target): + bce_loss = functional.binary_cross_entropy_with_logits(input, target, reduction='none') + + p = sigmoid(input) + p = where(target >= 0.5, p, 1-p) + + modulating_factor = (1 - p)**self.gamma + alpha = self.alpha * target + (1 - self.alpha) * (1 - target) + focal_loss = alpha * modulating_factor * bce_loss + + return focal_loss.mean() + + (x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np() + + # make multi-label binary + y_train = np.column_stack((y_train, y_train, y_train)) + y_train[y_train > 1] = 1 + y_test = np.column_stack((y_test, y_test, y_test)) + y_test[y_test > 1] = 1 + test = PytorchData(x_test.astype(np.float32), y_test) + + model = multi_label_binary_model(3, 4) + criterion = FocalLoss() + optimizer = optim.RMSprop(model.parameters(), lr=0.01) + + art_model = PyTorchClassifier(model=model, output_type=ModelOutputType.CLASSIFIER_LOGITS, loss=criterion, + optimizer=optimizer, input_shape=(24,), + nb_classes=3) + art_model.fit(PytorchData(x_train.astype(np.float32), y_train.astype(np.float32)), save_entire_model=False, + nb_epochs=10) + pred = art_model.predict(test) + assert (pred.shape[0] == x_test.shape[0]) + + score = art_model.score(test, apply_non_linearity=expit) + assert (score == 1.0)