diff --git a/apt/anonymization/anonymizer.py b/apt/anonymization/anonymizer.py index c3bbdb9..9f82c7c 100644 --- a/apt/anonymization/anonymizer.py +++ b/apt/anonymization/anonymizer.py @@ -3,6 +3,9 @@ import pandas as pd from scipy.spatial import distance from collections import Counter +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.pipeline import Pipeline from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.preprocessing import OneHotEncoder @@ -29,10 +32,13 @@ class Anonymize: is_regression : Bool, optional Whether the model is a regression model or not (if False, assumes a classification model). Default is False. + train_only_QI : Bool, optional + The required method to train data set for anonymization. Default is + to train the tree on all features. """ def __init__(self, k: int, quasi_identifiers: Union[np.ndarray, list], categorical_features: Optional[list] = None, - is_regression=False): + is_regression=False, train_only_QI=False): if k < 2: raise ValueError("k should be a positive integer with a value of 2 or higher") if quasi_identifiers is None or len(quasi_identifiers) < 1: @@ -42,6 +48,7 @@ class Anonymize: self.quasi_identifiers = quasi_identifiers self.categorical_features = categorical_features self.is_regression = is_regression + self.train_only_QI = train_only_QI def anonymize(self, x: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.DataFrame]) \ -> Union[np.ndarray, pd.DataFrame]: @@ -54,8 +61,10 @@ class Anonymize: :return: An array containing the anonymized training dataset. """ if type(x) == np.ndarray: + self.features = [i for i in range(x.shape[1])] return self._anonymize_ndarray(x.copy(), y) else: # pandas + self.features = x.columns if not self.categorical_features: raise ValueError('When supplying a pandas dataframe, categorical_features must be defined') return self._anonymize_pandas(x.copy(), y) @@ -63,7 +72,10 @@ class Anonymize: def _anonymize_ndarray(self, x, y): if x.shape[0] != y.shape[0]: raise ValueError("x and y should have same number of rows") - x_anonymizer_train = x[:, self.quasi_identifiers] + x_anonymizer_train = x + if self.train_only_QI: + # build DT just on QI features + x_anonymizer_train = x[:, self.quasi_identifiers] if x.dtype.kind not in 'iufc': x_prepared = self._modify_categorical_features(x_anonymizer_train) else: @@ -79,7 +91,10 @@ class Anonymize: def _anonymize_pandas(self, x, y): if x.shape[0] != y.shape[0]: raise ValueError("x and y should have same number of rows") - x_anonymizer_train = x.loc[:, self.quasi_identifiers] + x_anonymizer_train = x + if self.train_only_QI: + # build DT just on QI features + x_anonymizer_train = x.loc[:, self.quasi_identifiers] # need to one-hot encode before training the decision tree x_prepared = self._modify_categorical_features(x_anonymizer_train) if self.is_regression: @@ -169,6 +184,21 @@ class Anonymize: return x def _modify_categorical_features(self, x): - encoder = OneHotEncoder() - one_hot_encoded = encoder.fit_transform(x) - return one_hot_encoded + # prepare data for DT + used_features = self.features + if self.train_only_QI: + used_features = self.quasi_identifiers + numeric_features = [f for f in x.columns if f in used_features and f not in self.categorical_features] + categorical_features = [f for f in self.categorical_features if f in used_features] + numeric_transformer = Pipeline( + steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] + ) + categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False) + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numeric_features), + ("cat", categorical_transformer, categorical_features), + ] + ) + encoded = preprocessor.fit_transform(x) + return encoded diff --git a/tests/test_anonymizer.py b/tests/test_anonymizer.py index 466c129..000eefa 100644 --- a/tests/test_anonymizer.py +++ b/tests/test_anonymizer.py @@ -1,5 +1,8 @@ import pytest import numpy as np +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.pipeline import Pipeline from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor from sklearn.preprocessing import OneHotEncoder @@ -17,7 +20,7 @@ def test_anonymize_ndarray_iris(): k = 10 QI = [0, 2] - anonymizer = Anonymize(k, QI) + anonymizer = Anonymize(k, QI, train_only_QI=True) anon = anonymizer.anonymize(x_train, pred) assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(x_train[:, QI], axis=0))) _, counts_elements = np.unique(anon[:, QI], return_counts=True) @@ -27,16 +30,31 @@ def test_anonymize_ndarray_iris(): def test_anonymize_pandas_adult(): (x_train, y_train), _ = get_adult_dataset() - encoded = OneHotEncoder().fit_transform(x_train) - model = DecisionTreeClassifier() - model.fit(encoded, y_train) - pred = model.predict(encoded) k = 100 + features = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', + 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country'] QI = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'] categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'] + # prepare data for DT + numeric_features = [f for f in features if f not in categorical_features] + numeric_transformer = Pipeline( + steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] + ) + categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False) + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numeric_features), + ("cat", categorical_transformer, categorical_features), + ] + ) + encoded = preprocessor.fit_transform(x_train) + model = DecisionTreeClassifier() + model.fit(encoded, y_train) + pred = model.predict(encoded) + anonymizer = Anonymize(k, QI, categorical_features=categorical_features) anon = anonymizer.anonymize(x_train, pred) @@ -48,15 +66,29 @@ def test_anonymize_pandas_adult(): def test_anonymize_pandas_nursery(): (x_train, y_train), _ = get_nursery_dataset() x_train = x_train.astype(str) - encoded = OneHotEncoder().fit_transform(x_train) + + k = 100 + features = ["parents", "has_nurs", "form", "children", "housing", "finance", "social", "health"] + QI = ["finance", "social", "health"] + categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children'] + # prepare data for DT + numeric_features = [f for f in features if f not in categorical_features] + numeric_transformer = Pipeline( + steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] + ) + categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False) + preprocessor = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numeric_features), + ("cat", categorical_transformer, categorical_features), + ] + ) + encoded = preprocessor.fit_transform(x_train) model = DecisionTreeClassifier() model.fit(encoded, y_train) pred = model.predict(encoded) - k = 100 - QI = ["finance", "social", "health"] - categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children'] - anonymizer = Anonymize(k, QI, categorical_features=categorical_features) + anonymizer = Anonymize(k, QI, categorical_features=categorical_features, train_only_QI=True) anon = anonymizer.anonymize(x_train, pred) assert(anon.loc[:, QI].drop_duplicates().shape[0] < x_train.loc[:, QI].drop_duplicates().shape[0]) @@ -74,7 +106,7 @@ def test_regression(): pred = model.predict(x_train) k = 10 QI = [0, 2, 5, 8] - anonymizer = Anonymize(k, QI, is_regression=True) + anonymizer = Anonymize(k, QI, is_regression=True, train_only_QI=True) anon = anonymizer.anonymize(x_train, pred) print('Base model accuracy (R2 score): ', model.score(x_test, y_test)) model.fit(anon, y_train)