Initial support+test for pytorch multi-label binary classifier

Signed-off-by: abigailt <abigailt@il.ibm.com>
This commit is contained in:
abigailt 2024-02-19 14:16:03 +02:00
parent f197199e54
commit 79534b69db
4 changed files with 193 additions and 14 deletions

View file

@ -1,6 +1,6 @@
from apt.utils.models.model import Model, BlackboxClassifier, ModelOutputType, ScoringMethod, \
BlackboxClassifierPredictions, BlackboxClassifierPredictFunction, get_nb_classes, is_one_hot, \
check_correct_model_output
check_correct_model_output, is_multi_label, is_multi_label_binary
from apt.utils.models.sklearn_model import SklearnModel, SklearnClassifier, SklearnRegressor
from apt.utils.models.keras_model import KerasClassifier, KerasRegressor
from apt.utils.models.xgboost_model import XGBoostClassifier

View file

@ -3,14 +3,14 @@ import os
import shutil
import logging
from typing import Optional, Tuple
from typing import Optional, Tuple, Union, List, Callable
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
from art.utils import check_and_transform_label_format
from apt.utils.datasets.datasets import PytorchData
from apt.utils.models import Model, ModelOutputType
from apt.utils.models import Model, ModelOutputType, is_multi_label, is_multi_label_binary
from apt.utils.datasets import OUTPUT_DATA_ARRAY_TYPE
from art.estimators.classification.pytorch import PyTorchClassifier as ArtPyTorchClassifier
@ -30,16 +30,45 @@ class PyTorchClassifierWrapper(ArtPyTorchClassifier):
Extension for Pytorch ART model
"""
def __init__(
self,
model: "torch.nn.Module",
loss: "torch.nn.modules.loss._Loss",
input_shape: Tuple[int, ...],
nb_classes: int,
optimizer: Optional["torch.optim.Optimizer"] = None, # type: ignore
use_amp: bool = False,
opt_level: str = "O1",
loss_scale: Optional[Union[float, str]] = "dynamic",
channels_first: bool = True,
clip_values: Optional["CLIP_VALUES_TYPE"] = None,
preprocessing_defences: Union["Preprocessor", List["Preprocessor"], None] = None,
postprocessing_defences: Union["Postprocessor", List["Postprocessor"], None] = None,
preprocessing: "PREPROCESSING_TYPE" = (0.0, 1.0),
device_type: str = "gpu",
):
super().__init__(model, loss, input_shape, nb_classes, optimizer, use_amp, opt_level, loss_scale,
channels_first, clip_values, preprocessing_defences, postprocessing_defences, preprocessing,
device_type)
self._is_single_binary = None
self._is_multi_label = None
self._is_multi_label_binary = None
def get_step_correct(self, outputs, targets) -> int:
"""
Get number of correctly classified labels.
"""
# here everything is torch tensors
if len(outputs) != len(targets):
raise ValueError("outputs and targets should be the same length.")
if self.nb_classes > 1:
return int(torch.sum(torch.argmax(outputs, axis=-1) == targets).item())
if self._is_single_binary:
return int(torch.sum(torch.round(outputs) == targets).item())
elif self._is_multi_label:
if self._is_multi_label_binary:
outputs = torch.round(outputs)
return int(torch.sum(targets == outputs).item())
else:
return int(torch.sum(torch.round(outputs, axis=-1) == targets).item())
return int(torch.sum(torch.argmax(outputs, axis=-1) == targets).item())
def _eval(self, loader: DataLoader):
"""
@ -93,6 +122,10 @@ class PyTorchClassifierWrapper(ArtPyTorchClassifier):
:param kwargs: Dictionary of framework-specific arguments. This parameter is not currently
supported for PyTorch and providing it takes no effect.
"""
self._is_single_binary = self.nb_classes == 2 and (len(y.shape) == 1 or y.shape[1] == 1)
self._is_multi_label = is_multi_label(y)
self._is_multi_label_binary = is_multi_label_binary(y)
# Put the model in the training mode
self._model.train()
@ -400,24 +433,43 @@ class PyTorchClassifier(PyTorchModel):
"""
return self._art_model.predict(x.get_samples(), **kwargs)
def score(self, test_data: PytorchData, **kwargs):
def score(self, test_data: PytorchData, binary_threshold: Optional[float] = 0.5,
apply_non_linearity: Optional[Callable] = None, **kwargs):
"""
Score the model using test data.
:param test_data: Test data.
:type test_data: `PytorchData`
:param binary_threshold: The threshold to use on binary classification probabilities to assign the positive
class.
:type binary_threshold: float, optional. Default is 0.5.
:param apply_non_linearity: A non-linear function to apply to the result of the 'predict' method, in case the
model outputs logits (e.g., sigmoid).
:type apply_non_linearity: Callable, should be possible to apply directly to the numpy output of the 'predict'
method, optional.
:return: the score as float (between 0 and 1)
"""
# numpy arrays
y = test_data.get_labels()
predicted = self.predict(test_data)
if apply_non_linearity:
predicted = apply_non_linearity(predicted)
# binary classification, single column of probabilities
if self._art_model.nb_classes == 2 and (len(predicted.shape) == 1 or predicted.shape[1] == 1):
if len(predicted.shape) > 1:
y = check_and_transform_label_format(y, self._art_model.nb_classes, return_one_hot=False)
return np.count_nonzero(y == (predicted > 0.5)) / predicted.shape[0]
return np.count_nonzero(y == (predicted > binary_threshold)) / predicted.shape[0]
# multi column
else:
y = check_and_transform_label_format(y, self._art_model.nb_classes)
return np.count_nonzero(np.argmax(y, axis=1) == np.argmax(predicted, axis=1)) / predicted.shape[0]
if not is_multi_label(y):
y = check_and_transform_label_format(y, self._art_model.nb_classes)
return np.count_nonzero(np.argmax(y, axis=1) == np.argmax(predicted, axis=1)) / predicted.shape[0]
else:
if is_multi_label_binary(y):
predicted[predicted < binary_threshold] = 0
predicted[predicted >= binary_threshold] = 1
return np.count_nonzero(y == predicted) / (predicted.shape[0] * y.shape[1])
def load_checkpoint_state_dict_by_path(self, model_name: str, path: str = None):
"""

View file

@ -179,7 +179,7 @@ def test_blackbox_classifier_predictions_multi_label_cat():
def test_blackbox_classifier_predictions_multi_label_binary():
(x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np()
# make multi-label categorical
# make multi-label binary
y_train = np.column_stack((y_train, y_train, y_train))
y_train[y_train > 1] = 1
pred_train = y_train.copy().astype(float)

View file

@ -1,10 +1,14 @@
import numpy as np
from torch import nn, optim
from torch import nn, optim, sigmoid, where
from torch.nn import functional
# from torch.utils.data import DataLoader, TensorDataset
from scipy.special import expit
from apt.utils.datasets.datasets import PytorchData
from apt.utils.models import ModelOutputType
from apt.utils.models.pytorch_model import PyTorchClassifier
from art.utils import load_nursery
from apt.utils import dataset_utils
class pytorch_model(nn.Module):
@ -39,7 +43,7 @@ class pytorch_model(nn.Module):
return self.classifier(out)
def test_nursery_pytorch_state_dict():
def test_pytorch_nursery_state_dict():
(x_train, y_train), (x_test, y_test), _, _ = load_nursery(test_set=0.5)
# reduce size of training set to make attack slightly better
train_set_size = 500
@ -67,7 +71,7 @@ def test_nursery_pytorch_state_dict():
assert (0 <= score <= 1)
def test_nursery_pytorch_save_entire_model():
def test_pytorch_nursery_save_entire_model():
(x_train, y_train), (x_test, y_test), _, _ = load_nursery(test_set=0.5)
# reduce size of training set to make attack slightly better
@ -94,3 +98,126 @@ def test_nursery_pytorch_save_entire_model():
score = art_model.score(PytorchData(x_test.astype(np.float32), y_test))
print('best model accuracy: ', score)
assert (0 <= score <= 1)
# def test_pytorch_predictions_multi_label_cat():
# # This kind of model requires special training and will not be supported using the 'fit' method.
# class multi_label_cat_model(nn.Module):
#
# def __init__(self, num_classes, num_features):
# super(multi_label_cat_model, self).__init__()
#
# self.fc1 = nn.Sequential(
# nn.Linear(num_features, 256),
# nn.Tanh(), )
#
# self.classifier1 = nn.Linear(256, num_classes)
# self.classifier2 = nn.Linear(256, num_classes)
# self.classifier3 = nn.Linear(256, num_classes)
#
# def forward(self, x):
# out1 = self.classifier1(self.fc1(x))
# out2 = self.classifier2(self.fc1(x))
# out3 = self.classifier3(self.fc1(x))
# return out1, out2, out3
#
# (x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np()
#
# # make multi-label categorical
# y_train = np.column_stack((y_train, y_train, y_train))
# y_test = np.column_stack((y_test, y_test, y_test))
# test = PytorchData(x_test, y_test)
#
# model = multi_label_cat_model(3, 4)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.01)
#
# # train model
# train_dataset = TensorDataset(from_numpy(x_train.astype(np.float32)), from_numpy(y_train.astype(np.float32)))
# train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
#
# for epoch in range(5):
# # Train for one epoch
# for inputs, targets in train_loader:
# # Zero the parameter gradients
# optimizer.zero_grad()
#
# # Perform prediction
# model_outputs = model(inputs)[-1]
#
# # Form the loss function
# loss = 0
# for i, o in enumerate(model_outputs):
# loss += criterion(o, targets[i])
#
# loss.backward()
#
# optimizer.step()
#
# art_model = PyTorchClassifier(model=model, output_type=ModelOutputType.CLASSIFIER_LOGITS, loss=criterion,
# optimizer=optimizer, input_shape=(24,),
# nb_classes=3)
#
# pred = art_model.predict(test)
# assert (pred.shape[0] == x_test.shape[0])
#
# score = art_model.score(test, apply_non_linearity=expit)
# assert (score == 1.0)
def test_pytorch_predictions_multi_label_binary():
class multi_label_binary_model(nn.Module):
def __init__(self, num_labels, num_features):
super(multi_label_binary_model, self).__init__()
self.fc1 = nn.Sequential(
nn.Linear(num_features, 256),
nn.Tanh(), )
self.classifier1 = nn.Linear(256, num_labels)
def forward(self, x):
return self.classifier1(self.fc1(x))
# missing sigmoid on each output
class FocalLoss(nn.Module):
def __init__(self, gamma=2, alpha=0.5):
super(FocalLoss, self).__init__()
self.gamma = gamma
self.alpha = alpha
def forward(self, input, target):
bce_loss = functional.binary_cross_entropy_with_logits(input, target, reduction='none')
p = sigmoid(input)
p = where(target >= 0.5, p, 1-p)
modulating_factor = (1 - p)**self.gamma
alpha = self.alpha * target + (1 - self.alpha) * (1 - target)
focal_loss = alpha * modulating_factor * bce_loss
return focal_loss.mean()
(x_train, y_train), (x_test, y_test) = dataset_utils.get_iris_dataset_np()
# make multi-label binary
y_train = np.column_stack((y_train, y_train, y_train))
y_train[y_train > 1] = 1
y_test = np.column_stack((y_test, y_test, y_test))
y_test[y_test > 1] = 1
test = PytorchData(x_test.astype(np.float32), y_test)
model = multi_label_binary_model(3, 4)
criterion = FocalLoss()
optimizer = optim.RMSprop(model.parameters(), lr=0.01)
art_model = PyTorchClassifier(model=model, output_type=ModelOutputType.CLASSIFIER_LOGITS, loss=criterion,
optimizer=optimizer, input_shape=(24,),
nb_classes=3)
art_model.fit(PytorchData(x_train.astype(np.float32), y_train.astype(np.float32)), save_entire_model=False,
nb_epochs=10)
pred = art_model.predict(test)
assert (pred.shape[0] == x_test.shape[0])
score = art_model.score(test, apply_non_linearity=expit)
assert (score == 1.0)