Build the dt on all features anon (#23)

* add param to build the DT on all features and not just on QI
* one-hot encoding only for categorical features
This commit is contained in:
olasaadi 2022-03-07 20:12:55 +02:00 committed by GitHub
parent c47819a031
commit d53818644e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 79 additions and 17 deletions

View file

@ -3,6 +3,9 @@ import pandas as pd
from scipy.spatial import distance
from collections import Counter
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import OneHotEncoder
@ -29,10 +32,13 @@ class Anonymize:
is_regression : Bool, optional
Whether the model is a regression model or not (if False, assumes
a classification model). Default is False.
train_only_QI : Bool, optional
The required method to train data set for anonymization. Default is
to train the tree on all features.
"""
def __init__(self, k: int, quasi_identifiers: Union[np.ndarray, list], categorical_features: Optional[list] = None,
is_regression=False):
is_regression=False, train_only_QI=False):
if k < 2:
raise ValueError("k should be a positive integer with a value of 2 or higher")
if quasi_identifiers is None or len(quasi_identifiers) < 1:
@ -42,6 +48,7 @@ class Anonymize:
self.quasi_identifiers = quasi_identifiers
self.categorical_features = categorical_features
self.is_regression = is_regression
self.train_only_QI = train_only_QI
def anonymize(self, x: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.DataFrame]) \
-> Union[np.ndarray, pd.DataFrame]:
@ -54,8 +61,10 @@ class Anonymize:
:return: An array containing the anonymized training dataset.
"""
if type(x) == np.ndarray:
self.features = [i for i in range(x.shape[1])]
return self._anonymize_ndarray(x.copy(), y)
else: # pandas
self.features = x.columns
if not self.categorical_features:
raise ValueError('When supplying a pandas dataframe, categorical_features must be defined')
return self._anonymize_pandas(x.copy(), y)
@ -63,6 +72,9 @@ class Anonymize:
def _anonymize_ndarray(self, x, y):
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
x_anonymizer_train = x
if self.train_only_QI:
# build DT just on QI features
x_anonymizer_train = x[:, self.quasi_identifiers]
if x.dtype.kind not in 'iufc':
x_prepared = self._modify_categorical_features(x_anonymizer_train)
@ -79,6 +91,9 @@ class Anonymize:
def _anonymize_pandas(self, x, y):
if x.shape[0] != y.shape[0]:
raise ValueError("x and y should have same number of rows")
x_anonymizer_train = x
if self.train_only_QI:
# build DT just on QI features
x_anonymizer_train = x.loc[:, self.quasi_identifiers]
# need to one-hot encode before training the decision tree
x_prepared = self._modify_categorical_features(x_anonymizer_train)
@ -169,6 +184,21 @@ class Anonymize:
return x
def _modify_categorical_features(self, x):
encoder = OneHotEncoder()
one_hot_encoded = encoder.fit_transform(x)
return one_hot_encoded
# prepare data for DT
used_features = self.features
if self.train_only_QI:
used_features = self.quasi_identifiers
numeric_features = [f for f in x.columns if f in used_features and f not in self.categorical_features]
categorical_features = [f for f in self.categorical_features if f in used_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x)
return encoded

View file

@ -1,5 +1,8 @@
import pytest
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.preprocessing import OneHotEncoder
@ -17,7 +20,7 @@ def test_anonymize_ndarray_iris():
k = 10
QI = [0, 2]
anonymizer = Anonymize(k, QI)
anonymizer = Anonymize(k, QI, train_only_QI=True)
anon = anonymizer.anonymize(x_train, pred)
assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(x_train[:, QI], axis=0)))
_, counts_elements = np.unique(anon[:, QI], return_counts=True)
@ -27,16 +30,31 @@ def test_anonymize_ndarray_iris():
def test_anonymize_pandas_adult():
(x_train, y_train), _ = get_adult_dataset()
encoded = OneHotEncoder().fit_transform(x_train)
model = DecisionTreeClassifier()
model.fit(encoded, y_train)
pred = model.predict(encoded)
k = 100
features = ['age', 'workclass', 'education-num', 'marital-status', 'occupation',
'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
QI = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'native-country']
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'native-country']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
model = DecisionTreeClassifier()
model.fit(encoded, y_train)
pred = model.predict(encoded)
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
anon = anonymizer.anonymize(x_train, pred)
@ -48,15 +66,29 @@ def test_anonymize_pandas_adult():
def test_anonymize_pandas_nursery():
(x_train, y_train), _ = get_nursery_dataset()
x_train = x_train.astype(str)
encoded = OneHotEncoder().fit_transform(x_train)
k = 100
features = ["parents", "has_nurs", "form", "children", "housing", "finance", "social", "health"]
QI = ["finance", "social", "health"]
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
model = DecisionTreeClassifier()
model.fit(encoded, y_train)
pred = model.predict(encoded)
k = 100
QI = ["finance", "social", "health"]
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
anonymizer = Anonymize(k, QI, categorical_features=categorical_features, train_only_QI=True)
anon = anonymizer.anonymize(x_train, pred)
assert(anon.loc[:, QI].drop_duplicates().shape[0] < x_train.loc[:, QI].drop_duplicates().shape[0])
@ -74,7 +106,7 @@ def test_regression():
pred = model.predict(x_train)
k = 10
QI = [0, 2, 5, 8]
anonymizer = Anonymize(k, QI, is_regression=True)
anonymizer = Anonymize(k, QI, is_regression=True, train_only_QI=True)
anon = anonymizer.anonymize(x_train, pred)
print('Base model accuracy (R2 score): ', model.score(x_test, y_test))
model.fit(anon, y_train)