Address review comments, add additional distribution comparison tests and make them externally configurable too, in addition to the alpha becoming configurable.

Signed-off-by: Maya Anderson <mayaa@il.ibm.com>
This commit is contained in:
Maya Anderson 2023-09-20 19:44:54 +03:00
parent 0ee0bf05d6
commit 34de3ff93b
7 changed files with 234 additions and 165 deletions

View file

@ -22,12 +22,14 @@ class DistributionValidationResult:
"""Holds the result of the validation of distributions similarities.
Attributes:
distributions_valid (bool): False if there are columns whose distribution is different between the datasets
distributions_validated : False if distribution validation failed for some reason, and no conclusion was drawn
distributions_valid: False if there are columns whose distribution is different between the datasets
member_column_distribution_diff (list): Columns whose distribution is different between the member and the
synthetic datasets
non_member_column_distribution_diff (list): Columns whose distribution is different between the non-member and
the synthetic datasets
"""
distributions_validated: bool
distributions_valid: bool
member_column_distribution_diff: list
non_member_column_distribution_diff: list
@ -38,23 +40,50 @@ class KNNAttackStrategyUtils(AttackStrategyUtils):
Common utilities for attack strategy based on KNN distances.
"""
def __init__(self, use_batches: bool = False, batch_size: int = 10) -> None:
def __init__(self, use_batches: bool = False, batch_size: int = 10, distribution_comparison_alpha: float = 0.05,
distribution_comparison_numeric_test: str = 'KS',
distribution_comparison_categorical_test: str = 'CHI') -> None:
"""
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set
:param batch_size: if use_batches=True, the size of batch_size should be > 0
:param batch_size: if use_batches is True, the size of batch_size should be > 0
:param distribution_comparison_alpha: the significance level of the statistical distribution test pvalue.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution and we claim that
the distributions are different.
:param distribution_comparison_numeric_test: Type of test to compare distributions of numeric columns. Can be:
'KS' for the two-sample Kolmogorov-Smirnov test for goodness of fit,
'CVM' for the two-sample Cramér-von Mises test for goodness of fit,
'AD' for the Anderson-Darling test for 2-samples,
'ES' for the Epps-Singleton (ES) test statistic. The default is 'KS'
:param distribution_comparison_categorical_test: Type of test to compare distributions of categorical columns.
Can be:
'CHI' for the one-way chi-square test,
'AD' for The Anderson-Darling test for 2-samples,
'ES' for the Epps-Singleton (ES) test statistic.
The default is 'ES'.
"""
self.use_batches = use_batches
self.batch_size = batch_size
if use_batches:
if batch_size < 1:
raise ValueError(f"When using batching batch_size should be > 0, and not {batch_size}")
self.distribution_comparison_alpha = distribution_comparison_alpha
self.distribution_comparison_numeric_test = distribution_comparison_numeric_test
self.distribution_comparison_categorical_test = distribution_comparison_categorical_test
def fit(self, knn_learner: NearestNeighbors, dataset: ArrayDataset):
"""
Fit the KNN learner.
:param knn_learner: The KNN model to fit.
:param dataset: The training set to fit the model on.
"""
knn_learner.fit(dataset.get_samples())
def find_knn(self, knn_learner: NearestNeighbors, query_samples: ArrayDataset, distance_processor=None):
"""
Nearest neighbor search function.
:param query_samples: query samples, to which nearest neighbors are to be found
:param knn_learner: unsupervised learner for implementing neighbor searches, after it was fitted
:param distance_processor: function for processing the distance into another more relevant metric per sample.
@ -89,21 +118,36 @@ class KNNAttackStrategyUtils(AttackStrategyUtils):
return np.concatenate(distances)
@staticmethod
def _column_statistical_test(df1_column_samples, df2_column_samples, column, is_categorical, is_numeric, test_type,
alpha, differing_columns):
if is_categorical(column):
try:
result = stats.chisquare(f_obs=df1_column_samples, f_exp=df1_column_samples)
except ValueError as e:
if str(e).startswith('For each axis slice, the sum of'):
print('Column', column, e)
else:
raise
def _column_statistical_test(df1_column_samples, df2_column_samples, column, is_categorical, is_numeric,
numeric_test_type, categorical_test_type, alpha, differing_columns):
if is_categorical:
test_type = categorical_test_type
if test_type == 'CHI':
try:
result = stats.chisquare(f_obs=df1_column_samples, f_exp=df1_column_samples)
except ValueError as e:
if str(e).startswith('For each axis slice, the sum of'):
print('Column', column, ' the observed and expected sums are not the same,'
'so cannot run distribution comparison test')
raise e
else:
raise
elif test_type == 'AD':
result = stats.anderson_ksamp([df1_column_samples, df2_column_samples], midrank=True)
elif test_type == 'ES':
result = stats.epps_singleton_2samp(df1_column_samples, df2_column_samples)
else:
raise ValueError('Unknown test type', test_type)
elif is_numeric:
test_type = numeric_test_type
if test_type == 'KS':
result = stats.ks_2samp(df1_column_samples, df2_column_samples)
elif test_type == 'CVM':
result = stats.cramervonmises_2samp(df1_column_samples, df1_column_samples)
elif test_type == 'AD':
result = stats.anderson_ksamp([df1_column_samples, df2_column_samples], midrank=True)
elif test_type == 'ES':
result = stats.epps_singleton_2samp(df1_column_samples, df2_column_samples)
else:
raise ValueError('Unknown test type', test_type)
else:
@ -120,40 +164,42 @@ class KNNAttackStrategyUtils(AttackStrategyUtils):
# Accept H0, similar distributions
print(f'Accept H0, similar distributions in column {column}')
@staticmethod
def _columns_different_distributions(df1: ArrayDataset, df2: ArrayDataset,
categorical_features: list = [],
alpha=0.05, test_type='KS') -> list:
def _columns_different_distributions(self, df1: ArrayDataset, df2: ArrayDataset,
categorical_features: list = []) -> list:
differing_columns = []
df1_samples = df1.get_samples()
df2_samples = df2.get_samples()
if df1.is_pandas:
def is_categorical(col_name):
col_name in categorical_features or is_categorical_dtype(df1_samples.dtypes[col_name])
def is_numeric(col_name):
is_numeric_dtype(df1_samples.dtypes[col_name])
for name, _ in df1_samples.items():
is_categorical = name in categorical_features or is_categorical_dtype(df1_samples.dtypes[name])
is_numeric = is_numeric_dtype(df1_samples.dtypes[name])
KNNAttackStrategyUtils._column_statistical_test(df1_samples[name], df2_samples[name], name,
is_categorical, is_numeric(df1_samples.dtypes[name]),
test_type, alpha, differing_columns)
is_categorical, is_numeric,
self.distribution_comparison_numeric_test,
self.distribution_comparison_categorical_test,
self.distribution_comparison_alpha,
differing_columns)
else:
is_df1_numeric_dtype = np.issubdtype(df1_samples.dtype, int) or np.issubdtype(df1_samples.dtype, float)
def is_categorical(col_name):
col_name in categorical_features
is_numeric = np.issubdtype(df1_samples.dtype, int) or np.issubdtype(df1_samples.dtype, float)
for i, column in enumerate(df1_samples.T):
is_categorical = i in categorical_features
KNNAttackStrategyUtils._column_statistical_test(df1_samples[:, i], df2_samples[:, i], i,
is_categorical, is_df1_numeric_dtype, test_type, alpha,
differing_columns)
is_categorical, is_numeric,
self.distribution_comparison_numeric_test,
self.distribution_comparison_categorical_test,
self.distribution_comparison_alpha, differing_columns)
return differing_columns
def validate_distributions(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, categorical_features: list = None):
"""
Validate column distributions are similar between the datasets.
One advantage of the ES test compared to the KS test is that is does not assume a continuous distribution.
In [1], the authors conclude that the test also has a higher power than the KS test in many examples. They
recommend the use of the ES test for discrete samples as well as continuous samples with at least 25
observations each, whereas AD is recommended for smaller sample sizes in the continuous case.
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
@ -161,17 +207,27 @@ class KNNAttackStrategyUtils(AttackStrategyUtils):
:return:
DistributionValidationResult
"""
member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_members,
categorical_features)
non_member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_non_members,
try:
member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_members,
categorical_features)
if not member_column_distribution_diff and not non_member_column_distribution_diff:
return DistributionValidationResult(distributions_valid=True,
non_member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_non_members,
categorical_features)
except (ValueError, np.linalg.LinAlgError) as e:
print("Failed to validate distributions", e)
return DistributionValidationResult(distributions_validated=True,
distributions_valid=False,
member_column_distribution_diff=[],
non_member_column_distribution_diff=[])
return DistributionValidationResult(distributions_valid=False,
if not member_column_distribution_diff and not non_member_column_distribution_diff:
return DistributionValidationResult(distributions_validated=True,
distributions_valid=True,
member_column_distribution_diff=[],
non_member_column_distribution_diff=[])
return DistributionValidationResult(distributions_validated=True,
distributions_valid=False,
member_column_distribution_diff=member_column_distribution_diff,
non_member_column_distribution_diff=non_member_column_distribution_diff)

View file

@ -19,7 +19,15 @@ from data_assessment.dataset_attack_membership_classification import DatasetAtta
@dataclass
class DatasetAssessmentManagerConfig:
"""
Configuration for DatasetAssessmentManager.
:param persist_reports: save assessment results to filesystem, or not.
:param timestamp_reports: if persist_reports is True, then define if create a separate report for each timestamp,
or append to the same reports
:param generate_plots: generate and visualize plots as part of assessment, or not..
"""
persist_reports: bool = False
timestamp_reports: bool = False
generate_plots: bool = False
@ -47,6 +55,7 @@ class DatasetAssessmentManager:
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param dataset_name: A name to identify this dataset, optional
:param categorical_features: A list of categorical feature names or numbers
:return:
a list of dataset attack risk scores
@ -84,10 +93,16 @@ class DatasetAssessmentManager:
return self.attack_scores
def dump_all_scores_to_files(self):
"""
Save assessment results to filesystem.
"""
if self.config.persist_reports:
time_str = time.strftime("%Y%m%d-%H%M%S")
for i, (attack_name, attack_scores) in enumerate(self.attack_scores.items()):
results_log_file = f"{time_str}_{attack_name}_results.log.csv"
if self.config.timestamp_reports:
results_log_file = f"{time_str}_{attack_name}_results.log.csv"
else:
results_log_file = f"{attack_name}_results.log.csv"
run_results_df = (pd.DataFrame(attack_scores).drop('result', axis=1, errors='ignore').
drop('distributions_validation_result', axis=1, errors='ignore'))
run_results_df.to_csv(results_log_file, header=True, encoding='utf-8', index=False, mode='w') # Overwrite
run_results_df.to_csv(results_log_file, header=True, encoding='utf-8', index=False, mode='w')

View file

@ -61,7 +61,8 @@ class DatasetAttackMembershipClassification(DatasetAttackMembership):
from the members dataset (training) as opposed to the distinguishability of the synthetic dataset from the
non-members dataset (test).
The privacy risk measure is calculated as the ratio of the receiver operating characteristic curve (AUC ROC) of
the members dataset to AU ROC of the non-members dataset.
the members dataset to AUC ROC of the non-members dataset. It can be 0.0 or higher, with higher scores meaning
higher privacy risk and worse privacy.
"""
SHORT_NAME = 'MembershipClassification'

View file

@ -9,10 +9,6 @@ from typing import Callable
import numpy as np
from sklearn.neighbors import NearestNeighbors
from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils, DistributionValidationResult
from apt.risk.data_assessment.dataset_attack import DatasetAttackMembership, Config
@ -35,6 +31,10 @@ class DatasetAttackConfigMembershipKnnProbabilities(Config):
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
generate_plot: Generate or not an AUR ROC curve and persist it in a file
distribution_comparison_alpha: the significance level of the statistical distribution test p-value.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution, and we claim that the
distributions are different.
"""
k: int = 5
use_batches: bool = False
@ -42,6 +42,7 @@ class DatasetAttackConfigMembershipKnnProbabilities(Config):
compute_distance: Callable = None
distance_params: dict = None
generate_plot: bool = False
distribution_comparison_alpha: float = 0.05
@dataclass
@ -81,8 +82,7 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
synthetic_data: ArrayDataset,
config: DatasetAttackConfigMembershipKnnProbabilities = DatasetAttackConfigMembershipKnnProbabilities(),
dataset_name: str = DEFAULT_DATASET_NAME,
categorical_features: list = None,
add_reference: bool = False, reference_synthetic_data: ArrayDataset = None):
categorical_features: list = None, **kwargs):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
@ -90,7 +90,8 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
:param config: Configuration parameters to guide the attack, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size,
config.distribution_comparison_alpha, **kwargs)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
categorical_features, attack_strategy_utils)
if config.compute_distance:
@ -99,36 +100,6 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
else:
self.knn_learner = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
self.has_reference = add_reference
if not add_reference:
return
if reference_synthetic_data:
self.synthetic_data_ref = reference_synthetic_data
else:
# Y not used, but needed for ArrayDataset
X_non_members, X_reference = \
train_test_split(original_data_non_members.get_samples(), test_size=0.5, random_state=7)
# ref_filename = "ref_data.csv"
# test_filename = "test_data.csv"
# if os.path.exists(ref_filename) and os.path.exists(test_filename):
# x_synth_ref = np.genfromtxt(ref_filename, delimiter=",")
# X_non_members = np.genfromtxt(test_filename, delimiter=",")
# else:
x_synth_ref = self.generate_synth_data(len(X_reference), n_components=10, original_data=X_reference)
# np.savetxt(ref_filename, x_synth_ref, delimiter=",")
# np.savetxt(test_filename, X_non_members, delimiter=",")
self.original_data_non_members = ArrayDataset(X_non_members)
self.synthetic_data_ref = ArrayDataset(x_synth_ref)
if config.compute_distance:
self.knn_learner_ref = NearestNeighbors(n_neighbors=config.k, algorithm='auto',
metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.knn_learner_ref = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
def short_name(self):
return self.SHORT_NAME
@ -143,11 +114,15 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
it is more likely that the query sample was used to train the generative model. This probability is approximated
by the Parzen window density estimation in ``probability_per_sample()``, computed from the NN distances from the
query samples to the synthetic data samples.
Before running the assessment, there is a validation that the distribution of the synthetic data is similar to
that of the original data members and to that of the original data non-members.
:return:
Privacy score of the attack together with the attack result with the probabilities of member and
non-member samples to be generated by the synthetic data generator based on the NN distances from the
query samples to the synthetic data samples
The result also contains the distribution validation result and a warning if the distributions are not
similar.
"""
distributions_validation_result = self.attack_strategy_utils.validate_distributions(
self.original_data_members, self.original_data_non_members, self.synthetic_data, self.categorical_features)
@ -161,34 +136,10 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
# non-members query
non_member_distances = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_non_members)
if self.has_reference:
self.attack_strategy_utils.fit(self.knn_learner_ref, self.synthetic_data_ref)
# members query
member_distances_ref = self.attack_strategy_utils.find_knn(self.knn_learner_ref,
self.original_data_members)
# non-members query
non_member_distances_ref = self.attack_strategy_utils.find_knn(self.knn_learner_ref,
self.original_data_non_members)
assert (len(member_distances) == len(member_distances_ref))
assert (len(non_member_distances) == len(non_member_distances_ref))
num_pos_samples = len(member_distances)
num_neg_samples = len(non_member_distances)
member_proba_calibrate = self.probability_per_sample(member_distances[:num_pos_samples] -
member_distances_ref[:num_pos_samples])
non_member_proba_calibrate = self.probability_per_sample(non_member_distances[:num_neg_samples] -
non_member_distances_ref[:num_neg_samples])
result = DatasetAttackResultMembership(member_probabilities=member_proba_calibrate,
non_member_probabilities=non_member_proba_calibrate)
else:
member_proba = self.probability_per_sample(member_distances)
non_member_proba = self.probability_per_sample(non_member_distances)
result = DatasetAttackResultMembership(member_probabilities=member_proba,
non_member_probabilities=non_member_proba)
member_proba = self.probability_per_sample(member_distances)
non_member_proba = self.probability_per_sample(non_member_distances)
result = DatasetAttackResultMembership(member_probabilities=member_proba,
non_member_probabilities=non_member_proba)
score = self.calculate_privacy_score(result, self.config.generate_plot)
score.distributions_validation_result = distributions_validation_result
@ -227,22 +178,3 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
numpy array of size (n,)
"""
return np.average(np.exp(-distances), axis=1)
@staticmethod
def generate_synth_data(n_samples, n_components, original_data):
"""
Simple KDE synthetic data genrator: estimates the kernel density of data using a Gaussian kernel and then generates
samples from this distribution
"""
digit_data = original_data
pca = PCA(n_components=n_components, whiten=False)
data = pca.fit_transform(digit_data)
params = {'bandwidth': np.logspace(-1, 1, 20)}
grid = GridSearchCV(KernelDensity(), params, cv=5)
grid.fit(data)
kde_estimator = grid.best_estimator_
new_data = kde_estimator.sample(n_samples, random_state=0)
new_data = pca.inverse_transform(new_data)
return new_data

View file

@ -30,11 +30,18 @@ class DatasetAttackConfigWholeDatasetKnnDistance(Config):
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
distribution_comparison_alpha: the significance level of the statistical distribution test p-value.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution, and we claim that the
distributions are different.
"""
use_batches: bool = False
batch_size: int = 10
compute_distance: callable = None
distance_params: dict = None
distribution_comparison_alpha: float = 0.05
distribution_comparison_numeric_test: str = 'KS',
distribution_comparison_categorical_test: str = 'CHI'
@dataclass
@ -68,7 +75,7 @@ class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigWholeDatasetKnnDistance = DatasetAttackConfigWholeDatasetKnnDistance(),
dataset_name: str = DEFAULT_DATASET_NAME, categorical_features: list = None):
dataset_name: str = DEFAULT_DATASET_NAME, categorical_features: list = None, **kwargs):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
@ -76,7 +83,8 @@ class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
:param config: Configuration parameters to guide the assessment process, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size,
config.distribution_comparison_alpha, **kwargs)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
categorical_features, attack_strategy_utils)
if config.compute_distance:
@ -95,8 +103,12 @@ class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
"""
Calculate the share of synthetic records closer to the training than the holdout dataset, based on the
DCR computed by 'calculate_distances()'.
Before running the assessment, there is a validation that the distribution of the synthetic data is similar to
that of the original data members and to that of the original data non-members.
:return:
score of the attack, based on the NN distances from the query samples to the synthetic data samples
score of the attack, based on the NN distances from the query samples to the synthetic data samples.
The result also contains the distribution validation result and a warning if the distributions are not
similar.
"""
distributions_validation_result = self.attack_strategy_utils.validate_distributions(
self.original_data_members, self.original_data_non_members, self.synthetic_data, self.categorical_features)

View file

@ -21,6 +21,7 @@ MIN_SHARE = 0.5
MIN_ROC_AUC = 0.0
MIN_PRECISION = 0.0
NUM_SYNTH_SAMPLES = 100
NUM_SYNTH_COMPONENTS = 4
iris_dataset_np = get_iris_dataset_np()
@ -28,7 +29,7 @@ diabetes_dataset_np = get_diabetes_dataset_np()
nursery_dataset_pd = get_nursery_dataset_pd()
adult_dataset_pd = get_adult_dataset_pd()
mgr = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=True, generate_plots=False))
mgr = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=False, generate_plots=False))
def teardown_function():
@ -36,10 +37,10 @@ def teardown_function():
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)] \
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
anon_testdata = ([('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)]
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)])
@pytest.mark.parametrize("name, data, dataset_type, k, mgr", anon_testdata)
@ -97,13 +98,12 @@ def test_risk_kde(name, data, dataset_type, mgr):
else:
raise ValueError('Pandas dataset missing a preprocessing step')
num_synth_samples = x_train.shape[0] # required by the chi test
synth_data = ArrayDataset(
kde(num_synth_samples, n_components=num_synth_components, original_data=encoded))
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(num_synth_samples) + name
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features)

View file

@ -4,6 +4,10 @@ from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
from data_assessment.dataset_attack_membership_classification import DatasetAttackScoreMembershipClassification
from data_assessment.dataset_attack_membership_knn_probabilities import DatasetAttackScoreMembershipKnnProbabilities, \
DatasetAttackConfigMembershipKnnProbabilities, DatasetAttackMembershipKnnProbabilities
from data_assessment.dataset_attack_whole_dataset_knn_distance import DatasetAttackScoreWholeDatasetKnnDistance
from tests.test_data_assessment import kde, preprocess_nursery_x_data
NUM_SYNTH_SAMPLES = 10
@ -28,10 +32,10 @@ def teardown_function():
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', mgr1)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr2)] \
+ [('iris_np', iris_dataset_np, 'np', mgr3)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr4)]
anon_testdata = ([('iris_np', iris_dataset_np, 'np', mgr1)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr2)]
+ [('iris_np', iris_dataset_np, 'np', mgr3)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr4)])
@pytest.mark.parametrize("name, data, dataset_type, mgr", anon_testdata)
@ -44,9 +48,10 @@ def test_risk_anonymization(name, data, dataset_type, mgr):
preprocessed_x_test = x_test
QI = [0, 2]
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
categorical_features = []
elif "nursery" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
preprocessed_x_train, preprocessed_x_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 20))
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
@ -57,11 +62,12 @@ def test_risk_anonymization(name, data, dataset_type, mgr):
dataset_name = f'anon_k{ANON_K}_{name}'
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, anonymized_data,
dataset_name)
dataset_name, categorical_features)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=anonymized_data, dataset_name=None)
synth_data=anonymized_data, dataset_name=None,
categorical_features=categorical_features)
testdata = [('iris_np', iris_dataset_np, 'np', mgr4),
@ -72,38 +78,85 @@ testdata = [('iris_np', iris_dataset_np, 'np', mgr4),
@pytest.mark.parametrize("name, data, dataset_type, mgr", testdata)
def test_risk_kde(name, data, dataset_type, mgr):
original_data_members, original_data_non_members, synthetic_data, categorical_features \
= encode_and_generate_synthetic_data(dataset_type, name, data)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synthetic_data,
dataset_name, categorical_features)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=synthetic_data, dataset_name=None,
categorical_features=categorical_features)
testdata_knn_options = [('iris_np', iris_dataset_np, 'np'),
('nursery_pd', nursery_dataset_pd, 'pd')]
@pytest.mark.parametrize("name, data, dataset_type", testdata_knn_options)
def test_risk_kde_knn_options(name, data, dataset_type):
original_data_members, original_data_non_members, synthetic_data, categorical_features \
= encode_and_generate_synthetic_data(dataset_type, name, data)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
config_g = DatasetAttackConfigMembershipKnnProbabilities(use_batches=True, generate_plot=False,
distribution_comparison_alpha=0.1)
numeric_tests = ['KS', 'CVM', 'AD', 'ES']
categorical_tests = ['CHI', 'AD', 'ES']
for numeric_test in numeric_tests:
for categorical_test in categorical_tests:
attack_g = DatasetAttackMembershipKnnProbabilities(original_data_members,
original_data_non_members,
synthetic_data,
config_g,
dataset_name,
categorical_features,
distribution_comparison_numeric_test=numeric_test,
distribution_comparison_categorical_test=categorical_test
)
score_g = attack_g.assess_privacy()
assert score_g.roc_auc_score > MIN_ROC_AUC
assert score_g.average_precision_score > MIN_PRECISION
def encode_and_generate_synthetic_data(dataset_type, name, data):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
encoded = x_train
encoded_test = x_test
num_synth_components = NUM_SYNTH_COMPONENTS
categorical_features = []
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
encoded, encoded_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
else:
raise ValueError('Pandas dataset missing a preprocessing step')
synth_data = ArrayDataset(
synthetic_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=synth_data, dataset_name=None)
return original_data_members, original_data_non_members, synthetic_data, categorical_features
def assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data,
dataset_name):
if dataset_name:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data,
dataset_name)
else:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data)
assert (score_g.roc_auc_score > MIN_ROC_AUC)
assert (score_g.average_precision_score > MIN_PRECISION)
assert (score_h.share > MIN_SHARE)
def assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features):
attack_scores = mgr.assess(original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features)
for i, (assessment_type, scores) in enumerate(attack_scores.items()):
if assessment_type == 'MembershipKnnProbabilities':
score_g: DatasetAttackScoreMembershipKnnProbabilities = scores[0]
assert score_g.roc_auc_score > MIN_ROC_AUC
assert score_g.average_precision_score > MIN_PRECISION
elif assessment_type == 'WholeDatasetKnnDistance':
score_h: DatasetAttackScoreWholeDatasetKnnDistance = scores[0]
assert score_h.share > MIN_SHARE
if assessment_type == 'MembershipClassification':
score_mc: DatasetAttackScoreMembershipClassification = scores[0]
assert score_mc.synthetic_data_quality_warning is False
assert 0 <= score_mc.normalized_ratio <= 1