ai-privacy-toolkit/apt/anonymization/anonymizer.py

146 lines
6.6 KiB
Python
Raw Normal View History

2021-04-28 14:00:19 +03:00
import numpy as np
import pandas as pd
from scipy.spatial import distance
from collections import Counter
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import OneHotEncoder
from apt.utils.datasets import ArrayDataset, DATA_PANDAS_NUMPY_TYPE
2021-04-28 14:00:19 +03:00
from typing import Union, Optional
class Anonymize:
"""
Class for performing tailored, model-guided anonymization of training datasets for ML models.
Based on the implementation described in: https://arxiv.org/abs/2007.13086
"""
def __init__(self, k: int, quasi_identifiers: Union[np.ndarray, list], categorical_features: Optional[list] = None,
is_regression=False):
2021-04-28 14:00:19 +03:00
"""
:param k: The privacy parameter that determines the number of records that will be indistinguishable from each
other (when looking at the quasi identifiers). Should be at least 2.
2022-03-22 13:59:28 +02:00
:param quasi_identifiers: The indexes of features that need to be minimized.
:param categorical_features: The list of categorical features indexes
:param is_regression: Boolean param indicates that is is a regression problem.
2021-04-28 14:00:19 +03:00
"""
if k < 2:
raise ValueError("k should be a positive integer with a value of 2 or higher")
if quasi_identifiers is None or len(quasi_identifiers) < 1:
2021-04-28 14:00:19 +03:00
raise ValueError("The list of quasi-identifiers cannot be empty")
self.k = k
self.quasi_identifiers = quasi_identifiers
self.categorical_features = categorical_features
self.is_regression = is_regression
2021-04-28 14:00:19 +03:00
def anonymize(self, dataset: ArrayDataset) -> DATA_PANDAS_NUMPY_TYPE:
2021-04-28 14:00:19 +03:00
"""
Method for performing model-guided anonymization.
2022-03-15 10:52:45 +02:00
:param dataset: Data wrapper containing the training data for the model and the predictions of the
2022-03-22 13:59:28 +02:00
original model on the training data.
2021-04-28 14:00:19 +03:00
:return: An array containing the anonymized training dataset.
"""
2022-03-22 13:59:28 +02:00
if dataset.features_names is not None:
self._features = dataset.features_names
# if features is None, use numbers instead of names
elif dataset.get_samples().shape[0] != 0:
self._features = [i for i in range(dataset.get_samples().shape[0])]
else:
2022-03-22 13:59:28 +02:00
raise ValueError('No data provided')
2022-03-23 17:54:37 +02:00
transformed = self._anonymize(dataset.get_samples().copy(), dataset.get_labels())
if dataset.is_pandas:
return pd.DataFrame(transformed, columns=self._features)
else:
return transformed
2021-04-28 14:00:19 +03:00
2022-03-23 17:54:37 +02:00
def _anonymize(self, x, y):
2021-04-28 14:00:19 +03:00
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
x_anonymizer_train = x[:, self.quasi_identifiers]
if x.dtype.kind not in 'iufc':
if not self.categorical_features:
2022-03-22 13:59:28 +02:00
raise ValueError('when supplying an array with non-numeric data, categorical_features must be defined')
x_prepared = self._modify_categorical_features(x_anonymizer_train)
else:
x_prepared = x_anonymizer_train
if self.is_regression:
self.anonymizer = DecisionTreeRegressor(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
else:
self.anonymizer = DecisionTreeClassifier(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
2022-03-10 12:56:41 +02:00
self.anonymizer.fit(x_prepared, y)
cells_by_id = self._calculate_cells(x, x_prepared)
2022-03-23 17:54:37 +02:00
return self._anonymize_data(x, x_prepared, cells_by_id)
2021-04-28 14:00:19 +03:00
def _calculate_cells(self, x, x_anonymizer_train):
# x is original data, x_anonymizer_train is only QIs + 1-hot encoded
cells_by_id = {}
leaves = []
for node, feature in enumerate(self.anonymizer.tree_.feature):
if feature == -2: # leaf node
leaves.append(node)
hist = [int(i) for i in self.anonymizer.tree_.value[node][0]]
# TODO we may change the method for choosing representative for cell
# label_hist = self.anonymizer.tree_.value[node][0]
# label = int(self.anonymizer.classes_[np.argmax(label_hist)])
cell = {'label': 1, 'hist': hist, 'id': int(node)}
2021-04-28 14:00:19 +03:00
cells_by_id[cell['id']] = cell
self.nodes = leaves
self._find_representatives(x, x_anonymizer_train, cells_by_id.values())
return cells_by_id
def _find_representatives(self, x, x_anonymizer_train, cells):
# x is original data, x_anonymizer_train is only QIs + 1-hot encoded
node_ids = self._find_sample_nodes(x_anonymizer_train)
for cell in cells:
cell['representative'] = {}
# get all rows in cell
indexes = [index for index, node_id in enumerate(node_ids) if node_id == cell['id']]
# TODO: should we filter only those with majority label? (using hist)
rows = x[indexes]
2021-04-28 14:00:19 +03:00
for feature in self.quasi_identifiers:
2022-03-21 21:59:14 +02:00
values = rows[:, feature]
2021-04-28 14:00:19 +03:00
if self.categorical_features and feature in self.categorical_features:
# find most common value
cell['representative'][feature] = Counter(values).most_common(1)[0][0]
else:
# find the mean value (per feature)
median = np.median(values)
min_value = max(values)
min_dist = float("inf")
for value in values:
dist = distance.euclidean(value, median)
if dist < min_dist:
min_dist = dist
min_value = value
cell['representative'][feature] = min_value
def _find_sample_nodes(self, samples):
paths = self.anonymizer.decision_path(samples).toarray()
node_set = set(self.nodes)
return [(list(set([i for i, v in enumerate(p) if v == 1]) & node_set))[0] for p in paths]
def _find_sample_cells(self, samples, cells_by_id):
node_ids = self._find_sample_nodes(samples)
return [cells_by_id[node_id] for node_id in node_ids]
2022-03-23 17:54:37 +02:00
def _anonymize_data(self, x, x_anonymizer_train, cells_by_id):
2021-04-28 14:00:19 +03:00
cells = self._find_sample_cells(x_anonymizer_train, cells_by_id)
index = 0
for row in x:
cell = cells[index]
index += 1
for feature in cell['representative']:
row[feature] = cell['representative'][feature]
return x
def _modify_categorical_features(self, x):
encoder = OneHotEncoder()
one_hot_encoded = encoder.fit_transform(x)
return one_hot_encoded