From 076503b248bbd1ea333ebefcd48b2cdfc843352d Mon Sep 17 00:00:00 2001 From: abigailt Date: Tue, 12 Mar 2024 13:25:31 +0200 Subject: [PATCH] Working example of anonymization with pytorch multi-output binary model Signed-off-by: abigailt --- apt/utils/datasets/datasets.py | 2 +- tests/test_anonymizer.py | 78 ++++++++++++++++++++++++++++++++-- 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/apt/utils/datasets/datasets.py b/apt/utils/datasets/datasets.py index cafdd45..da133df 100644 --- a/apt/utils/datasets/datasets.py +++ b/apt/utils/datasets/datasets.py @@ -233,7 +233,7 @@ class ArrayDataset(Dataset): raise ValueError("The supplied features are not the same as in the data features") self.features_names = x.columns.to_list() - if self._y is not None and len(self._x) != len(self._y): + if self._y is not None and self._x.shape[0] != self._y.shape[0]: raise ValueError("Non equivalent lengths of x and y") def get_samples(self) -> OUTPUT_DATA_ARRAY_TYPE: diff --git a/tests/test_anonymizer.py b/tests/test_anonymizer.py index d0a518d..83ba2dd 100644 --- a/tests/test_anonymizer.py +++ b/tests/test_anonymizer.py @@ -6,11 +6,17 @@ from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.preprocessing import OneHotEncoder - -from apt.anonymization import Anonymize -from apt.utils.dataset_utils import get_iris_dataset_np, get_adult_dataset_pd, get_nursery_dataset_pd from sklearn.datasets import load_diabetes from sklearn.model_selection import train_test_split +from torch import nn, optim, sigmoid, where +from torch.nn import functional +from scipy.special import expit + +from apt.utils.datasets.datasets import PytorchData +from apt.utils.models import ModelOutputType +from apt.utils.models.pytorch_model import PyTorchClassifier +from apt.anonymization import Anonymize +from apt.utils.dataset_utils import get_iris_dataset_np, get_adult_dataset_pd, get_nursery_dataset_pd from apt.utils.datasets import ArrayDataset @@ -187,6 +193,72 @@ def test_anonymize_pandas_one_hot(): assert ((np.min(anonymized_slice, axis=1) == 0).all()) +def test_anonymize_pytorch_multi_label_binary(): + class multi_label_binary_model(nn.Module): + def __init__(self, num_labels, num_features): + super(multi_label_binary_model, self).__init__() + + self.fc1 = nn.Sequential( + nn.Linear(num_features, 256), + nn.Tanh(), ) + + self.classifier1 = nn.Linear(256, num_labels) + + def forward(self, x): + return self.classifier1(self.fc1(x)) + # missing sigmoid on each output + + class FocalLoss(nn.Module): + def __init__(self, gamma=2, alpha=0.5): + super(FocalLoss, self).__init__() + self.gamma = gamma + self.alpha = alpha + + def forward(self, input, target): + bce_loss = functional.binary_cross_entropy_with_logits(input, target, reduction='none') + + p = sigmoid(input) + p = where(target >= 0.5, p, 1-p) + + modulating_factor = (1 - p)**self.gamma + alpha = self.alpha * target + (1 - self.alpha) * (1 - target) + focal_loss = alpha * modulating_factor * bce_loss + + return focal_loss.mean() + + (x_train, y_train), _ = get_iris_dataset_np() + + # make multi-label binary + y_train = np.column_stack((y_train, y_train, y_train)) + y_train[y_train > 1] = 1 + + model = multi_label_binary_model(3, 4) + criterion = FocalLoss() + optimizer = optim.RMSprop(model.parameters(), lr=0.01) + + art_model = PyTorchClassifier(model=model, + output_type=ModelOutputType.CLASSIFIER_MULTI_OUTPUT_BINARY_LOGITS, + loss=criterion, + optimizer=optimizer, + input_shape=(24,), + nb_classes=3) + art_model.fit(PytorchData(x_train.astype(np.float32), y_train.astype(np.float32)), save_entire_model=False, + nb_epochs=10) + pred = art_model.predict(PytorchData(x_train.astype(np.float32), y_train.astype(np.float32))) + pred = expit(pred) + pred[pred < 0.5] = 0 + pred[pred >= 0.5] = 1 + + k = 10 + QI = [0, 2] + anonymizer = Anonymize(k, QI, train_only_QI=True) + anon = anonymizer.anonymize(ArrayDataset(x_train, pred)) + assert (len(np.unique(anon[:, QI], axis=0)) < len(np.unique(x_train[:, QI], axis=0))) + _, counts_elements = np.unique(anon[:, QI], return_counts=True) + assert (np.min(counts_elements) >= k) + assert ((np.delete(anon, QI, axis=1) == np.delete(x_train, QI, axis=1)).all()) + + def test_errors(): with pytest.raises(ValueError): Anonymize(1, [0, 2])