Add Dataset assessment module

Signed-off-by: Maya Anderson <mayaa@il.ibm.com>
This commit is contained in:
Maya Anderson 2023-03-05 23:59:51 +02:00
parent c153635e4d
commit 3f9271b225
9 changed files with 716 additions and 0 deletions

View file

@ -0,0 +1,9 @@
"""
Module providing privacy risk assessment for synthetic data.
The main interface, ``DatasetAttack``, with the assess_privacy() main method assumes the availability of the
training data, holdout data and synthetic data at the time of the privacy evaluation.
It is implemented by two types of abstract classes: ``DatasetAttackPerRecord`` and ``DatasetAttackWhole``, to be
implemented by concrete assessment methods.
"""
from apt.risk.data_assessment import dataset_attack

View file

@ -0,0 +1,72 @@
import abc
import numpy as np
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
from apt.utils.datasets import ArrayDataset
class AttackStrategyUtils(abc.ABC):
"""
Abstract base class for common utilities of various privacy attack strategies.
"""
...
class KNNAttackStrategyUtils(AttackStrategyUtils):
"""
Common utilities for attack strategy based on KNN distances.
"""
def __init__(self, k: int, use_batches: bool = False, batch_size: int = 0) -> None:
"""
:param k: How many nearest neighbors to search
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set
:param batch_size: if use_batches=True, the size of batch_size should be > 0
"""
self.k = k
self.use_batches = use_batches
self.batch_size = batch_size
if use_batches:
if batch_size < 1:
raise ValueError(f"When using batching batch_size should be > 0, and not {batch_size}")
def fit(self, dataset: ArrayDataset, knn_learner: NearestNeighbors):
knn_learner.fit(dataset.get_samples())
def find_knn(self, query_samples: ArrayDataset, knn_learner: NearestNeighbors, distance_processor=None):
"""
Main nearest neighbor search function on synthetic data.
:param query_samples: query samples
:param knn_learner: unsupervised learner for implementing neighbor searches
:param distance_processor: function for processing the distance into another more relevant metric per sample.
Its input is an array representing distances (the distances returned by NearestNeighbors.kneighbors() ),
and the output should be another array with distance-based values that enable to compute the final score
:return:
distances of the query samples to their nearest neighbors, or a metric based on that distance and calculated
by the distance_processor function
"""
samples = query_samples.get_samples()
if not self.use_batches:
distances, _ = knn_learner.kneighbors(samples, self.k, return_distance=True)
if distance_processor:
return distance_processor(distances)
else:
return distances
probabilities = []
for i in tqdm(range(len(samples) // self.batch_size)):
x_batch = samples[i * self.batch_size:(i + 1) * self.batch_size]
x_batch = np.reshape(x_batch, [self.batch_size, -1])
# dist_batch: distance between every query sample in batch to its KNNs among training samples
dist_batch, _ = knn_learner.kneighbors(x_batch, self.k, return_distance=True)
# The probability of each sample to be generated
if distance_processor:
probability_per_sample_batch = distance_processor(dist_batch)
probabilities.append(probability_per_sample_batch)
else:
probabilities.append(dist_batch)
return np.concatenate(probabilities)

View file

@ -0,0 +1,65 @@
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from apt.risk.data_assessment.dataset_attack_gan_leaks import DatasetAttackGanLeaksConfig, DatasetAttackGanLeaks, \
DatasetAttackScoreGanLeaks
from apt.risk.data_assessment.dataset_attack_holdout import DatasetAttackHoldoutConfig, DatasetAttackHoldout, \
DatasetAttackScoreHoldout
from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAssessmentManagerConfig:
persist_reports: bool = True
generate_plots: bool = False
class DatasetAssessmentManager:
"""
The main class for running dataset assessment attacks.
"""
gan_leaks_attack_scores = []
holdout_attack_scores = []
def __init__(self, config: Optional[DatasetAssessmentManagerConfig] = DatasetAssessmentManagerConfig) -> None:
"""
:param config: Configuration parameters to guide the assessment process such as which attack
frameworks to use, optional
"""
self.config = config
def assess(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str) -> (
DatasetAttackScoreGanLeaks, DatasetAttackScoreHoldout):
config_gl = DatasetAttackGanLeaksConfig(use_batches=False)
mgr = DatasetAttackGanLeaks(original_data_members,
original_data_non_members,
synthetic_data,
dataset_name,
config_gl)
result = mgr.assess_privacy()
score_g = mgr.calculate_privacy_score(result, generate_plot=self.config.generate_plots)
self.gan_leaks_attack_scores.append(score_g)
config_h = DatasetAttackHoldoutConfig(use_batches=False)
mgr_h = DatasetAttackHoldout(original_data_members, original_data_non_members, synthetic_data,
dataset_name,
config_h)
score_h = mgr_h.assess_privacy()
self.holdout_attack_scores.append(score_h)
return score_g, score_h
def dump_all_scores_to_files(self):
if self.config.generate_plots:
results_log_file = "_results.log.csv"
self.dump_scores_to_file(self.gan_leaks_attack_scores, "gan_leaks" + results_log_file, True)
self.dump_scores_to_file(self.holdout_attack_scores, "holdout" + results_log_file, True)
@staticmethod
def dump_scores_to_file(attack_scores, filename, header: bool):
run_results_df = pd.DataFrame(attack_scores)
run_results_df.to_csv(filename, header=header, encoding='utf-8', index=False, mode='w') # Overwrite

View file

@ -0,0 +1,133 @@
"""
This module defines the interface for privacy risk assessment of synthetic datasets.
"""
import abc
from typing import Optional, Union
import matplotlib.pyplot as plt
import numpy as np
from sklearn import metrics
from sklearn.metrics import RocCurveDisplay
from apt.risk.data_assessment.attack_strategy_utils import AttackStrategyUtils
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DatasetAttackResultPerRecord, \
DatasetAttackResult
from apt.utils.datasets import ArrayDataset
class Config:
"""
The base class for dataset attack configurations
"""
...
class DatasetAttack(abc.ABC):
"""
The interface for performing privacy risk assessment for synthetic datasets.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str, attack_strategy_utils: AttackStrategyUtils,
config: Optional[Config] = Config()) -> None:
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param dataset_name: A name to identify the dataset under attack
:param attack_strategy_utils: Utils for use with the attack strategy
:param config: Configuration parameters to guide the assessment process such as which attack
frameworks to use, optional
"""
self.original_data_members = original_data_members
self.original_data_non_members = original_data_non_members
self.synthetic_data = synthetic_data
self.dataset_name = dataset_name
self.attack_strategy_utils = attack_strategy_utils
self.config = config
@abc.abstractmethod
def assess_privacy(self) -> Union[DatasetAttackScore, DatasetAttackResult]:
"""
Assess the privacy of the dataset
:return:
result: Union[DatasetAttackScore, DatasetAssessmentResult] can be either the final privacy attack score,
or an intermediate attack result, which can be translated into a privacy score if needed
"""
...
class DatasetAttackPerRecord(DatasetAttack):
"""
An abstract base class for performing privacy risk assessment for synthetic datasets on a per-record level.
"""
@abc.abstractmethod
def assess_privacy(self) -> DatasetAttackResultPerRecord:
"""
Assess the privacy of the dataset
:return:
result: DatasetAssessmentResult
"""
...
@abc.abstractmethod
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultPerRecord,
generate_plot=False) -> DatasetAttackScore:
"""
Calculate dataset privacy score based on the result of the privacy assessment
:return:
result: DatasetAttackScore
"""
...
def plot_roc_curve(self, pos_probabilities, neg_probabilities, name_prefix=""):
"""
Plot ROC curve
:param pos_probabilities: loss of the positive samples, the training data
:param neg_probabilities: loss of the negative samples, the hold-out data
:param name_prefix: name prefix for the ROC curve plot
"""
labels = np.concatenate((np.zeros((len(neg_probabilities),)), np.ones((len(pos_probabilities),))))
results = np.concatenate((neg_probabilities, pos_probabilities))
svc_disp = RocCurveDisplay.from_predictions(labels, results)
svc_disp.plot()
plt.plot([0, 1], [0, 1], color="navy", linewidth=2, linestyle="--", label='No skills')
plt.title('ROC curve')
plt.savefig(f'{name_prefix}{self.dataset_name}_roc_curve.png')
@staticmethod
def calculate_roc_score(pos_probabilities, neg_probabilities):
"""
Plot ROC curve
:param pos_probabilities: probability estimates of the positive samples, the training data
:param neg_probabilities: probability estimates of the negative samples, the hold-out data
:return:
fpr: False Positive rate
tpr: True Positive rate
threshold: threshold
auc: area under the Receiver Operating Characteristic Curve
ap: average precision score
"""
labels = np.concatenate((np.zeros((len(neg_probabilities),)), np.ones((len(pos_probabilities),))))
results = np.concatenate((neg_probabilities, pos_probabilities))
fpr, tpr, threshold = metrics.roc_curve(labels, results, pos_label=1)
auc = metrics.roc_auc_score(labels, results)
ap = metrics.average_precision_score(labels, results)
return fpr, tpr, threshold, auc, ap
class DatasetAttackWhole(DatasetAttack):
"""
An abstract base class for performing privacy risk assessment for synthetic datasets on a whole-dataset level.
"""
@abc.abstractmethod
def assess_privacy(self) -> DatasetAttackScore:
"""
Assess the privacy of the dataset
:return:
result: DatasetAssessmentResult
"""
...

View file

@ -0,0 +1,127 @@
"""
This module implements privacy risk assessment of synthetic datasets based on the paper:
"GAN-Leaks: A Taxonomy of Membership Inference Attacks against Generative Models" by D. Chen, N. Yu, Y. Zhang, M. Fritz
published in Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications Security, 34362, 2020.
https://doi.org/10.1145/3372297.3417238 and its implementation in https://github.com/DingfanChen/GAN-Leaks.
"""
from dataclasses import dataclass
from typing import Optional
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.dataset_attack import DatasetAttackPerRecord, Config
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DatasetAttackResultPerRecord
from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAttackGanLeaksConfig(Config):
"""Configuration for DatasetAttackGanLeaks.
Attributes:
k: Number of nearest neighbors to search
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors. See sklearn.neighbors.NearestNeighbors documentation.
batch_size: Additional keyword arguments for the distance computation function.
"""
k: int = 1
use_batches: bool = False
batch_size: int = 10
compute_distance: callable = None
distance_params: dict = None
@dataclass
class DatasetAttackScoreGanLeaks(DatasetAttackScore):
"""Configuration for DatasetAttackGanLeaks.
Attributes
----------
roc_auc_score : the share of synthetic records closer to the training than the holdout dataset
average_precision_score:
assessment_type : assessment type is 'GANLeaks', to be used in reports
"""
roc_auc_score: float
average_precision_score: float
assessment_type: str = 'GANLeaks'
class DatasetAttackGanLeaks(DatasetAttackPerRecord):
"""
Privacy risk assessment for synthetic datasets based Black-Box MIA attack using distances of
members (training set) and non-members (holdout set) from their nearest neighbors in the synthetic dataset.
The area under the receiver operating characteristic curve (AUCROC) gives the privacy risk measure.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str,
config: Optional[DatasetAttackGanLeaksConfig] = DatasetAttackGanLeaksConfig()):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param dataset_name: A name to identify this dataset
:param config: Configuration parameters to guide the assessment process such as which attack
frameworks to use, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.k, config.use_batches, config.batch_size)
super().__init__(original_data_members, original_data_non_members, synthetic_data, dataset_name,
attack_strategy_utils, config)
if config.compute_distance:
self.nn_obj = NearestNeighbors(n_neighbors=config.k, algorithm='auto', metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.nn_obj = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
def assess_privacy(self) -> DatasetAttackResultPerRecord:
"""
Calculate probabilities of positive and negative samples to be generated by the synthetic data generator
:return:
:result of the attack, based on the NN distances from the query samples to the synthetic data samples
"""
# nearest neighbor search
self.attack_strategy_utils.fit(self.synthetic_data, self.nn_obj)
# positive query
pos_proba = self.attack_strategy_utils.find_knn(self.original_data_members, self.nn_obj,
self.probability_per_sample)
# negative query
neg_proba = self.attack_strategy_utils.find_knn(self.original_data_non_members, self.nn_obj,
self.probability_per_sample)
result = DatasetAttackResultPerRecord(self.dataset_name, positive_probabilities=pos_proba,
negative_probabilities=neg_proba)
return result
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultPerRecord,
generate_plot=False) -> DatasetAttackScore:
"""
Calculate probabilities of positive and negative samples to be generated by the synthetic data generator
:param dataset_attack_result attack result containing probabilities of positive and negative samples to be
generated by the synthetic data generator
:param generate_plot generate AUC ROC curve plot and persist it
:return:
:score of the attack, based on distance-based probabilities
"""
pos_proba, neg_proba = \
dataset_attack_result.positive_probabilities, dataset_attack_result.negative_probabilities
fpr, tpr, threshold, auc, ap = self.calculate_roc_score(pos_proba, neg_proba)
score = DatasetAttackScoreGanLeaks(self.dataset_name, roc_auc_score=auc, average_precision_score=ap)
if generate_plot:
self.plot_roc_curve(pos_proba, neg_proba)
return score
@staticmethod
def probability_per_sample(distances: np.ndarray):
"""
For every sample represented by its distance from the query sample to its KNN in synthetic data,
the probability of the synthetic data to be part of the query dataset.
:param distances: distance between every query sample in batch to its KNNs among synthetic samples
:return:
distances: probability estimates of the query samples being generated and so being part of the synthetic set
"""
return np.average(np.exp(-distances), axis=1)

View file

@ -0,0 +1,115 @@
"""
This module implements privacy risk assessment of synthetic datasets based on the paper
"Holdout-Based Fidelity and Privacy Assessment of Mixed-Type Synthetic Data" by M. Platzer and T. Reutterer.
and on a variation of its reference implementation in https://github.com/mostly-ai/paper-fidelity-accuracy.
"""
import logging
from dataclasses import dataclass
from typing import Optional
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.dataset_attack import DatasetAttackWhole, Config
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore
from apt.utils.datasets import ArrayDataset
logger = logging.getLogger(__name__)
@dataclass
class DatasetAttackHoldoutConfig(Config):
"""Configuration for DatasetAttackHoldout.
Attributes:
k: Number of nearest neighbors to search
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors.
batch_size: Additional keyword arguments for the distance computation function.
"""
k: int = 1
use_batches: bool = False
batch_size: int = 10
compute_distance: callable = None
distance_params: dict = None
@dataclass
class DatasetAttackScoreHoldout(DatasetAttackScore):
"""Configuration for DatasetAttackHoldout.
Attributes
----------
share : the share of synthetic records closer to the training than the holdout dataset
assessment_type : assessment type is 'Holdout', to be used in reports
"""
share: float
assessment_type: str = 'Holdout'
class DatasetAttackHoldout(DatasetAttackWhole):
"""
Privacy risk assessment for synthetic datasets based on distances of synthetic data records from
members (training set) and non-members (holdout set). The privacy risk measure is the share of synthetic
records closer to the training than the holdout dataset.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str,
config: Optional[DatasetAttackHoldoutConfig] = DatasetAttackHoldoutConfig()):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param dataset_name: A name to identify this dataset
:param config: Configuration parameters to guide the assessment process such as which attack
frameworks to use, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.k, config.use_batches, config.batch_size)
super().__init__(original_data_members, original_data_non_members, synthetic_data, dataset_name,
attack_strategy_utils, config)
if config.compute_distance:
self.nn_obj_members = NearestNeighbors(n_neighbors=config.k, algorithm='auto',
metric=config.compute_distance,
metric_params=config.distance_params)
self.nn_obj_non_members = NearestNeighbors(n_neighbors=config.k, algorithm='auto',
metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.nn_obj_members = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
self.nn_obj_non_members = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
def assess_privacy(self) -> DatasetAttackScoreHoldout:
"""
Calculate the share of synthetic records closer to the training than the holdout dataset
:return:
:result of the attack, based on the NN distances from the query samples to the synthetic data samples
"""
member_distances, non_member_distances = self.calculate_distances()
n_members = len(member_distances)
n_non_members = len(non_member_distances)
assert (n_members == n_non_members)
share = np.mean(member_distances < non_member_distances) + (n_members / (n_members + n_non_members)) * np.mean(
member_distances == non_member_distances)
score = DatasetAttackScoreHoldout(self.dataset_name, share=share)
return score
def calculate_distances(self):
"""
Calculate positive and negative query probabilities, based on their distance to their KNNs among
synthetic samples.
:return:
pos_distances: distances of each synthetic data member from its nearest training samples
neg_distances: distances of each synthetic data member from its nearest validation samples
"""
# nearest neighbor search
self.attack_strategy_utils.fit(self.original_data_members, self.nn_obj_members)
self.attack_strategy_utils.fit(self.original_data_non_members, self.nn_obj_non_members)
# distances of the synthetic data from the positive and negative samples (members and non-members)
pos_distances = self.attack_strategy_utils.find_knn(self.synthetic_data, self.nn_obj_members)
neg_distances = self.attack_strategy_utils.find_knn(self.synthetic_data, self.nn_obj_non_members)
return pos_distances, neg_distances

View file

@ -0,0 +1,19 @@
from dataclasses import dataclass
import numpy as np
@dataclass
class DatasetAttackResult:
dataset_name: str
@dataclass
class DatasetAttackResultPerRecord(DatasetAttackResult):
positive_probabilities: np.ndarray
negative_probabilities: np.ndarray
@dataclass
class DatasetAttackScore:
dataset_name: str

View file

@ -3,6 +3,7 @@ pandas~=1.1.0
scipy==1.4.1
scikit-learn>=0.22.2
torch>=1.8.0
tqdm>=4.64.1
adversarial-robustness-toolbox>=1.11.0
# testing

View file

@ -0,0 +1,175 @@
import numpy as np
import pytest
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_diabetes_dataset_np, get_adult_dataset_pd, \
get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
NUM_SYNTH_SAMPLES = 40000
NUM_SYNTH_COMPONENTS = 4
iris_dataset_np = get_iris_dataset_np()
diabetes_dataset_np = get_diabetes_dataset_np()
nursery_dataset_pd = get_nursery_dataset_pd()
adult_dataset_pd = get_adult_dataset_pd()
mgr = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=True, generate_plots=False))
def teardown_function():
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)] \
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
@pytest.mark.parametrize("name, data, dataset_type, k, mgr", anon_testdata)
def test_risk_anonymization(name, data, dataset_type, k, mgr):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
original_data_members = ArrayDataset(x_train, y_train)
QI = [0, 2]
anonymizer = Anonymize(k, QI, train_only_QI=True)
anonymized_data = ArrayDataset(anonymizer.anonymize(original_data_members))
original_data_non_members = ArrayDataset(x_test, y_test)
elif "adult" in name:
encoded, encoded_test = preprocess_adult_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI)
anonymized_data = ArrayDataset(anonymizer.anonymize(ArrayDataset(encoded, y_train)))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI, train_only_QI=True)
anonymized_data = ArrayDataset(anonymizer.anonymize(ArrayDataset(encoded, y_train)))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
score_g, score_h = mgr.assess(original_data_members, original_data_non_members, anonymized_data,
f'anon_k{k}_{name}')
assert (score_g.roc_auc_score > 0.5)
assert (score_g.average_precision_score > 0.5)
assert (score_h.share > 0.5)
testdata = [('iris_np', iris_dataset_np, 'np', mgr),
('diabetes_np', diabetes_dataset_np, 'np', mgr),
('nursery_pd', nursery_dataset_pd, 'pd', mgr),
('adult_pd', adult_dataset_pd, 'pd', mgr)]
@pytest.mark.parametrize("name, data, dataset_type, mgr", testdata)
def test_risk_kde(name, data, dataset_type, mgr):
(x_train, y_train), (x_test, y_test) = data
original_data_members = ArrayDataset(x_train, y_train)
original_data_non_members = ArrayDataset(x_test, y_test)
if dataset_type == 'np':
synth_data = ArrayDataset(kde(NUM_SYNTH_SAMPLES, n_components=NUM_SYNTH_COMPONENTS,
original_data=original_data_members.get_samples()))
elif "adult" in name:
encoded, encoded_test = preprocess_adult_x_data(x_train, x_test)
num_synth_components = 10
synth_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
synth_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
score_g, score_h = mgr.assess(original_data_members, original_data_non_members, synth_data,
'kde' + str(NUM_SYNTH_SAMPLES) + name)
assert (score_g.roc_auc_score > 0.5)
assert (score_g.average_precision_score > 0.5)
assert (score_h.share > 0.5)
def kde(n_samples, n_components, original_data):
"""
Simple synthetic data genrator: estimates the kernel density of data using a Gaussian kernel and then generates
samples from this distribution
"""
digit_data = original_data
pca = PCA(n_components=n_components, whiten=False)
data = pca.fit_transform(digit_data)
params = {'bandwidth': np.logspace(-1, 1, 20)}
grid = GridSearchCV(KernelDensity(), params, cv=5)
grid.fit(data)
kde_estimator = grid.best_estimator_
new_data = kde_estimator.sample(n_samples, random_state=0)
new_data = pca.inverse_transform(new_data)
return new_data
def preprocess_adult_x_data(x_train, x_test):
features = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'native-country']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test
def preprocess_nursery_x_data(x_train, x_test):
x_train = x_train.astype(str)
features = ["parents", "has_nurs", "form", "children", "housing", "finance", "social", "health"]
# QI = ["finance", "social", "health"]
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test