mirror of
https://github.com/IBM/ai-privacy-toolkit.git
synced 2026-05-10 12:32:38 +02:00
using dataset wrapper on anonymizer
This commit is contained in:
parent
9f4d649934
commit
fd9f134336
3 changed files with 64 additions and 47 deletions
|
|
@ -5,6 +5,7 @@ from collections import Counter
|
||||||
|
|
||||||
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
|
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
|
||||||
from sklearn.preprocessing import OneHotEncoder
|
from sklearn.preprocessing import OneHotEncoder
|
||||||
|
from apt.utils.datasets import BaseDataset, Data
|
||||||
|
|
||||||
from typing import Union, Optional
|
from typing import Union, Optional
|
||||||
|
|
||||||
|
|
@ -37,8 +38,7 @@ class Anonymize:
|
||||||
self.categorical_features = categorical_features
|
self.categorical_features = categorical_features
|
||||||
self.is_regression = is_regression
|
self.is_regression = is_regression
|
||||||
|
|
||||||
def anonymize(self, x: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.DataFrame]) \
|
def anonymize(self, dataset: BaseDataset) -> Union[np.ndarray, pd.DataFrame]:
|
||||||
-> Union[np.ndarray, pd.DataFrame]:
|
|
||||||
"""
|
"""
|
||||||
Method for performing model-guided anonymization.
|
Method for performing model-guided anonymization.
|
||||||
|
|
||||||
|
|
@ -47,12 +47,12 @@ class Anonymize:
|
||||||
:param y: The predictions of the original model on the training data.
|
:param y: The predictions of the original model on the training data.
|
||||||
:return: An array containing the anonymized training dataset.
|
:return: An array containing the anonymized training dataset.
|
||||||
"""
|
"""
|
||||||
if type(x) == np.ndarray:
|
if type(dataset.x) == np.ndarray:
|
||||||
return self._anonymize_ndarray(x.copy(), y)
|
return self._anonymize_ndarray(dataset.x.copy(), dataset.y)
|
||||||
else: # pandas
|
else: # pandas
|
||||||
if not self.categorical_features:
|
if not self.categorical_features:
|
||||||
raise ValueError('When supplying a pandas dataframe, categorical_features must be defined')
|
raise ValueError('When supplying a pandas dataframe, categorical_features must be defined')
|
||||||
return self._anonymize_pandas(x.copy(), y)
|
return self._anonymize_pandas(dataset.x.copy(), dataset.y)
|
||||||
|
|
||||||
def _anonymize_ndarray(self, x, y):
|
def _anonymize_ndarray(self, x, y):
|
||||||
if x.shape[0] != y.shape[0]:
|
if x.shape[0] != y.shape[0]:
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ import ssl
|
||||||
from os import path, mkdir
|
from os import path, mkdir
|
||||||
from six.moves.urllib.request import urlretrieve
|
from six.moves.urllib.request import urlretrieve
|
||||||
|
|
||||||
|
from apt.utils.datasets import BaseDataset, Data
|
||||||
|
|
||||||
|
|
||||||
def _load_iris(test_set_size: float = 0.3):
|
def _load_iris(test_set_size: float = 0.3):
|
||||||
iris = datasets.load_iris()
|
iris = datasets.load_iris()
|
||||||
|
|
@ -14,8 +16,10 @@ def _load_iris(test_set_size: float = 0.3):
|
||||||
# Split training and test sets
|
# Split training and test sets
|
||||||
x_train, x_test, y_train, y_test = model_selection.train_test_split(data, labels, test_size=test_set_size,
|
x_train, x_test, y_train, y_test = model_selection.train_test_split(data, labels, test_size=test_set_size,
|
||||||
random_state=18, stratify=labels)
|
random_state=18, stratify=labels)
|
||||||
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
return (x_train, y_train), (x_test, y_test)
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
|
return dataset
|
||||||
|
|
||||||
|
|
||||||
def get_iris_dataset():
|
def get_iris_dataset():
|
||||||
|
|
@ -37,7 +41,10 @@ def _load_diabetes(test_set_size: float = 0.3):
|
||||||
x_train, x_test, y_train, y_test = model_selection.train_test_split(data, labels, test_size=test_set_size,
|
x_train, x_test, y_train, y_test = model_selection.train_test_split(data, labels, test_size=test_set_size,
|
||||||
random_state=18)
|
random_state=18)
|
||||||
|
|
||||||
return (x_train, y_train), (x_test, y_test)
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
|
return dataset
|
||||||
|
|
||||||
|
|
||||||
def get_diabetes_dataset():
|
def get_diabetes_dataset():
|
||||||
|
|
@ -97,7 +104,10 @@ def get_german_credit_dataset(test_set: float = 0.3):
|
||||||
x_test.reset_index(drop=True, inplace=True)
|
x_test.reset_index(drop=True, inplace=True)
|
||||||
y_test.reset_index(drop=True, inplace=True)
|
y_test.reset_index(drop=True, inplace=True)
|
||||||
|
|
||||||
return (x_train, y_train), (x_test, y_test)
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
|
return dataset
|
||||||
|
|
||||||
|
|
||||||
def _modify_german_dataset(data):
|
def _modify_german_dataset(data):
|
||||||
|
|
@ -156,8 +166,10 @@ def get_adult_dataset():
|
||||||
y_train = train.loc[:, 'label']
|
y_train = train.loc[:, 'label']
|
||||||
x_test = test.drop(['label'], axis=1)
|
x_test = test.drop(['label'], axis=1)
|
||||||
y_test = test.loc[:, 'label']
|
y_test = test.loc[:, 'label']
|
||||||
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
return (x_train, y_train), (x_test, y_test)
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
|
return dataset
|
||||||
|
|
||||||
|
|
||||||
def _modify_adult_dataset(data):
|
def _modify_adult_dataset(data):
|
||||||
|
|
@ -315,5 +327,10 @@ def get_nursery_dataset(raw: bool = True, test_set: float = 0.2, transform_socia
|
||||||
y_train = train.loc[:, "label"]
|
y_train = train.loc[:, "label"]
|
||||||
x_test = test.drop(["label"], axis=1)
|
x_test = test.drop(["label"], axis=1)
|
||||||
y_test = test.loc[:, "label"]
|
y_test = test.loc[:, "label"]
|
||||||
|
x_train = x_train.astype(str)
|
||||||
|
x_test = x_test.astype(str)
|
||||||
|
|
||||||
return (x_train, y_train), (x_test, y_test)
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
|
return dataset
|
||||||
|
|
|
||||||
|
|
@ -7,29 +7,29 @@ from apt.anonymization import Anonymize
|
||||||
from apt.utils.dataset_utils import get_iris_dataset, get_adult_dataset, get_nursery_dataset
|
from apt.utils.dataset_utils import get_iris_dataset, get_adult_dataset, get_nursery_dataset
|
||||||
from sklearn.datasets import load_diabetes
|
from sklearn.datasets import load_diabetes
|
||||||
from sklearn.model_selection import train_test_split
|
from sklearn.model_selection import train_test_split
|
||||||
|
from apt.utils.datasets import BaseDataset, Data
|
||||||
|
|
||||||
def test_anonymize_ndarray_iris():
|
def test_anonymize_ndarray_iris():
|
||||||
(x_train, y_train), _ = get_iris_dataset()
|
dataset = get_iris_dataset()
|
||||||
model = DecisionTreeClassifier()
|
model = DecisionTreeClassifier()
|
||||||
model.fit(x_train, y_train)
|
model.fit(dataset.get_train_samples(), dataset.get_train_labels())
|
||||||
pred = model.predict(x_train)
|
pred = model.predict(dataset.get_train_samples())
|
||||||
|
|
||||||
k = 10
|
k = 10
|
||||||
QI = [0, 2]
|
QI = [0, 2]
|
||||||
anonymizer = Anonymize(k, QI)
|
anonymizer = Anonymize(k, QI)
|
||||||
anon = anonymizer.anonymize(x_train, pred)
|
anon = anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), pred))
|
||||||
assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(x_train[:, QI], axis=0)))
|
assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(dataset.get_train_samples()[:, QI], axis=0)))
|
||||||
_, counts_elements = np.unique(anon[:, QI], return_counts=True)
|
_, counts_elements = np.unique(anon[:, QI], return_counts=True)
|
||||||
assert (np.min(counts_elements) >= k)
|
assert (np.min(counts_elements) >= k)
|
||||||
assert ((np.delete(anon, QI, axis=1) == np.delete(x_train, QI, axis=1)).all())
|
assert ((np.delete(anon, QI, axis=1) == np.delete(dataset.get_train_samples(), QI, axis=1)).all())
|
||||||
|
|
||||||
|
|
||||||
def test_anonymize_pandas_adult():
|
def test_anonymize_pandas_adult():
|
||||||
(x_train, y_train), _ = get_adult_dataset()
|
dataset = get_adult_dataset()
|
||||||
encoded = OneHotEncoder().fit_transform(x_train)
|
encoded = OneHotEncoder().fit_transform(dataset.get_train_samples())
|
||||||
model = DecisionTreeClassifier()
|
model = DecisionTreeClassifier()
|
||||||
model.fit(encoded, y_train)
|
model.fit(encoded, dataset.get_train_labels())
|
||||||
pred = model.predict(encoded)
|
pred = model.predict(encoded)
|
||||||
|
|
||||||
k = 100
|
k = 100
|
||||||
|
|
@ -38,51 +38,51 @@ def test_anonymize_pandas_adult():
|
||||||
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
|
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
|
||||||
'native-country']
|
'native-country']
|
||||||
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
|
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
|
||||||
anon = anonymizer.anonymize(x_train, pred)
|
anon = anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), pred))
|
||||||
|
|
||||||
assert(anon.loc[:, QI].drop_duplicates().shape[0] < x_train.loc[:, QI].drop_duplicates().shape[0])
|
assert(anon.loc[:, QI].drop_duplicates().shape[0] < dataset.get_train_samples().loc[:, QI].drop_duplicates().shape[0])
|
||||||
assert (anon.loc[:, QI].value_counts().min() >= k)
|
assert (anon.loc[:, QI].value_counts().min() >= k)
|
||||||
assert (anon.drop(QI, axis=1).equals(x_train.drop(QI, axis=1)))
|
assert (anon.drop(QI, axis=1).equals(dataset.get_train_samples().drop(QI, axis=1)))
|
||||||
|
|
||||||
|
|
||||||
def test_anonymize_pandas_nursery():
|
def test_anonymize_pandas_nursery():
|
||||||
(x_train, y_train), _ = get_nursery_dataset()
|
dataset = get_nursery_dataset()
|
||||||
x_train = x_train.astype(str)
|
encoded = OneHotEncoder().fit_transform(dataset.get_train_samples())
|
||||||
encoded = OneHotEncoder().fit_transform(x_train)
|
|
||||||
model = DecisionTreeClassifier()
|
model = DecisionTreeClassifier()
|
||||||
model.fit(encoded, y_train)
|
model.fit(encoded, dataset.get_train_labels())
|
||||||
pred = model.predict(encoded)
|
pred = model.predict(encoded)
|
||||||
|
|
||||||
k = 100
|
k = 100
|
||||||
QI = ["finance", "social", "health"]
|
QI = ["finance", "social", "health"]
|
||||||
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
|
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
|
||||||
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
|
anonymizer = Anonymize(k, QI, categorical_features=categorical_features)
|
||||||
anon = anonymizer.anonymize(x_train, pred)
|
anon = anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), pred))
|
||||||
|
|
||||||
assert(anon.loc[:, QI].drop_duplicates().shape[0] < x_train.loc[:, QI].drop_duplicates().shape[0])
|
assert(anon.loc[:, QI].drop_duplicates().shape[0] < dataset.get_train_samples().loc[:, QI].drop_duplicates().shape[0])
|
||||||
assert (anon.loc[:, QI].value_counts().min() >= k)
|
assert (anon.loc[:, QI].value_counts().min() >= k)
|
||||||
assert (anon.drop(QI, axis=1).equals(x_train.drop(QI, axis=1)))
|
assert (anon.drop(QI, axis=1).equals(dataset.get_train_samples().drop(QI, axis=1)))
|
||||||
|
|
||||||
|
|
||||||
def test_regression():
|
def test_regression():
|
||||||
|
|
||||||
dataset = load_diabetes()
|
x_train, x_test, y_train, y_test = train_test_split(load_diabetes().data, load_diabetes().target, test_size=0.5, random_state=14)
|
||||||
x_train, x_test, y_train, y_test = train_test_split(dataset.data, dataset.target, test_size=0.5, random_state=14)
|
train_dataset = BaseDataset(x_train, y_train)
|
||||||
|
test_dataset = BaseDataset(x_test, y_test)
|
||||||
|
dataset = Data(train_dataset, test_dataset)
|
||||||
model = DecisionTreeRegressor(random_state=10, min_samples_split=2)
|
model = DecisionTreeRegressor(random_state=10, min_samples_split=2)
|
||||||
model.fit(x_train, y_train)
|
model.fit(dataset.get_train_samples(), dataset.get_train_labels())
|
||||||
pred = model.predict(x_train)
|
pred = model.predict(dataset.get_train_samples())
|
||||||
k = 10
|
k = 10
|
||||||
QI = [0, 2, 5, 8]
|
QI = [0, 2, 5, 8]
|
||||||
anonymizer = Anonymize(k, QI, is_regression=True)
|
anonymizer = Anonymize(k, QI, is_regression=True)
|
||||||
anon = anonymizer.anonymize(x_train, pred)
|
anon = anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), pred))
|
||||||
print('Base model accuracy (R2 score): ', model.score(x_test, y_test))
|
print('Base model accuracy (R2 score): ', model.score(dataset.get_test_samples(), dataset.get_test_labels()))
|
||||||
model.fit(anon, y_train)
|
model.fit(anon, dataset.get_train_labels())
|
||||||
print('Base model accuracy (R2 score) after anonymization: ', model.score(x_test, y_test))
|
print('Base model accuracy (R2 score) after anonymization: ', model.score(dataset.get_test_samples(), dataset.get_test_labels()))
|
||||||
assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(x_train[:, QI], axis=0)))
|
assert(len(np.unique(anon[:, QI], axis=0)) < len(np.unique(dataset.get_train_samples()[:, QI], axis=0)))
|
||||||
_, counts_elements = np.unique(anon[:, QI], return_counts=True)
|
_, counts_elements = np.unique(anon[:, QI], return_counts=True)
|
||||||
assert (np.min(counts_elements) >= k)
|
assert (np.min(counts_elements) >= k)
|
||||||
assert ((np.delete(anon, QI, axis=1) == np.delete(x_train, QI, axis=1)).all())
|
assert ((np.delete(anon, QI, axis=1) == np.delete(dataset.get_train_samples(), QI, axis=1)).all())
|
||||||
|
|
||||||
|
|
||||||
def test_errors():
|
def test_errors():
|
||||||
|
|
@ -93,9 +93,9 @@ def test_errors():
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
Anonymize(2, None)
|
Anonymize(2, None)
|
||||||
anonymizer = Anonymize(10, [0, 2])
|
anonymizer = Anonymize(10, [0, 2])
|
||||||
(x_train, y_train), (x_test, y_test) = get_iris_dataset()
|
dataset = get_iris_dataset()
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
anonymizer.anonymize(x_train, y_test)
|
anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), dataset.get_test_labels()))
|
||||||
(x_train, y_train), _ = get_adult_dataset()
|
dataset = get_adult_dataset()
|
||||||
with pytest.raises(ValueError):
|
with pytest.raises(ValueError):
|
||||||
anonymizer.anonymize(x_train, y_train)
|
anonymizer.anonymize(BaseDataset(dataset.get_train_samples(), dataset.get_train_labels()))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue