anonymizer works with numpy and return numpy/pandas as original dataset

This commit is contained in:
olasaadi 2022-03-19 18:06:10 +02:00
parent 7b788b9018
commit 3263f92bee
3 changed files with 44 additions and 45 deletions

View file

@ -5,7 +5,7 @@ from collections import Counter
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import OneHotEncoder
from apt.utils.datasets import ArrayDataset, DATA_ARRAY_TYPE
from apt.utils.datasets import ArrayDataset, DATA_PANDAS_NUMPY_TYPE
from typing import Union, Optional
@ -17,7 +17,7 @@ class Anonymize:
Based on the implementation described in: https://arxiv.org/abs/2007.13086
"""
def __init__(self, k: int, quasi_identifiers: Union[np.ndarray, list], categorical_features: Optional[list] = None,
def __init__(self, k: int, quasi_identifiers: Union[np.ndarray, list], features = None, categorical_features: Optional[list] = None,
is_regression=False):
"""
:param k: The privacy parameter that determines the number of records that will be indistinguishable from each
@ -37,8 +37,9 @@ class Anonymize:
self.quasi_identifiers = quasi_identifiers
self.categorical_features = categorical_features
self.is_regression = is_regression
self.features = features
def anonymize(self, dataset: ArrayDataset) -> DATA_ARRAY_TYPE:
def anonymize(self, dataset: ArrayDataset) -> DATA_PANDAS_NUMPY_TYPE:
"""
Method for performing model-guided anonymization.
@ -47,18 +48,32 @@ class Anonymize:
contain both numeric and categorical data.
:return: An array containing the anonymized training dataset.
"""
if type(dataset.get_samples()) == np.ndarray:
return self._anonymize_ndarray(dataset.get_samples().copy(), dataset.get_labels())
else: # pandas
if not self.categorical_features:
raise ValueError('When supplying a pandas dataframe, categorical_features must be defined')
return self._anonymize_pandas(dataset.get_samples().copy(), dataset.get_labels())
if self.features:
self._features = self.features
# if features is None, use numbers instead of names
elif dataset.get_samples().shape[0] != 0:
self._features = [i for i in range(dataset.get_samples().shape[0])]
else:
self._features = None
if self.quasi_identifiers and self.features:
self.quasi_identifiers = [i for i,v in enumerate(self.features) if v in self.quasi_identifiers]
if self.categorical_features and self.features:
self.categorical_features = [i for i,v in enumerate(self.features) if v in self.categorical_features]
transformed = self._anonymize_ndarray(dataset.get_samples().copy(), dataset.get_labels())
if dataset.is_numpy:
return transformed
else:
return pd.DataFrame(transformed, columns=self._features)
def _anonymize_ndarray(self, x, y):
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
x_anonymizer_train = x[:, self.quasi_identifiers]
if x.dtype.kind not in 'iufc':
if not self.categorical_features:
raise ValueError('When supplying a pandas dataframe, categorical_features must be defined')
x_prepared = self._modify_categorical_features(x_anonymizer_train)
else:
x_prepared = x_anonymizer_train
@ -71,22 +86,6 @@ class Anonymize:
cells_by_id = self._calculate_cells(x, x_prepared)
return self._anonymize_data_numpy(x, x_prepared, cells_by_id)
def _anonymize_pandas(self, x, y):
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
x_anonymizer_train = x.loc[:, self.quasi_identifiers]
# need to one-hot encode before training the decision tree
x_prepared = self._modify_categorical_features(x_anonymizer_train)
if self.is_regression:
self.anonymizer = DecisionTreeRegressor(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
else:
self.anonymizer = DecisionTreeClassifier(random_state=10, min_samples_split=2, min_samples_leaf=self.k)
if len(y.shape) > 1:
y = np.argmax(y, axis=1)
self.anonymizer.fit(x_prepared, y)
cells_by_id = self._calculate_cells(x, x_prepared)
return self._anonymize_data_pandas(x, x_prepared, cells_by_id)
def _calculate_cells(self, x, x_anonymizer_train):
# x is original data, x_anonymizer_train is only QIs + 1-hot encoded
cells_by_id = {}
@ -155,16 +154,6 @@ class Anonymize:
row[feature] = cell['representative'][feature]
return x
def _anonymize_data_pandas(self, x, x_anonymizer_train, cells_by_id):
cells = self._find_sample_cells(x_anonymizer_train, cells_by_id)
index = 0
for i, row in x.iterrows():
cell = cells[index]
index += 1
for feature in cell['representative']:
x.at[i, feature] = cell['representative'][feature]
return x
def _modify_categorical_features(self, x):
encoder = OneHotEncoder()
one_hot_encoded = encoder.fit_transform(x)