Add column distribution comparison, and a third method for dataset asssessment by membership classification (#84)

* Add column distribution comparison, and a third method for dataset assessment by membership classification

* Address review comments, add additional distribution comparison tests and make them externally configurable too, in addition to the alpha becoming configurable.

Signed-off-by: Maya Anderson <mayaa@il.ibm.com>
This commit is contained in:
andersonm-ibm 2023-09-21 16:43:19 +03:00 committed by GitHub
parent 13a0567183
commit a40484e0c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 676 additions and 205 deletions

View file

@ -1,33 +1,75 @@
import abc
from dataclasses import dataclass
import numpy as np
from scipy import stats
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
from pandas.api.types import is_numeric_dtype, is_categorical_dtype
from apt.utils.datasets import ArrayDataset
class AttackStrategyUtils(abc.ABC):
"""
Abstract base class for common utilities of various privacy attack strategies.
Abstract base class for common utilities of various privacy attack strategies.
"""
pass
@dataclass
class DistributionValidationResult:
"""Holds the result of the validation of distributions similarities.
Attributes:
distributions_validated : False if distribution validation failed for some reason, and no conclusion was drawn
distributions_valid: False if there are columns whose distribution is different between the datasets
member_column_distribution_diff (list): Columns whose distribution is different between the member and the
synthetic datasets
non_member_column_distribution_diff (list): Columns whose distribution is different between the non-member and
the synthetic datasets
"""
distributions_validated: bool
distributions_valid: bool
member_column_distribution_diff: list
non_member_column_distribution_diff: list
class KNNAttackStrategyUtils(AttackStrategyUtils):
"""
Common utilities for attack strategy based on KNN distances.
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set.
:param batch_size: if use_batches=True, the size of batch_size should be > 0.
Common utilities for attack strategy based on KNN distances.
"""
def __init__(self, use_batches: bool = False, batch_size: int = 10) -> None:
def __init__(self, use_batches: bool = False, batch_size: int = 10, distribution_comparison_alpha: float = 0.05,
distribution_comparison_numeric_test: str = 'KS',
distribution_comparison_categorical_test: str = 'CHI') -> None:
"""
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set
:param batch_size: if use_batches is True, the size of batch_size should be > 0
:param distribution_comparison_alpha: the significance level of the statistical distribution test pvalue.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution and we claim that
the distributions are different.
:param distribution_comparison_numeric_test: Type of test to compare distributions of numeric columns. Can be:
'KS' for the two-sample Kolmogorov-Smirnov test for goodness of fit,
'CVM' for the two-sample Cramér-von Mises test for goodness of fit,
'AD' for the Anderson-Darling test for 2-samples,
'ES' for the Epps-Singleton (ES) test statistic. The default is 'KS'
:param distribution_comparison_categorical_test: Type of test to compare distributions of categorical columns.
Can be:
'CHI' for the one-way chi-square test,
'AD' for The Anderson-Darling test for 2-samples,
'ES' for the Epps-Singleton (ES) test statistic.
The default is 'ES'.
"""
self.use_batches = use_batches
self.batch_size = batch_size
if use_batches:
if batch_size < 1:
raise ValueError(f"When using batching batch_size should be > 0, and not {batch_size}")
self.distribution_comparison_alpha = distribution_comparison_alpha
self.distribution_comparison_numeric_test = distribution_comparison_numeric_test
self.distribution_comparison_categorical_test = distribution_comparison_categorical_test
def fit(self, knn_learner: NearestNeighbors, dataset: ArrayDataset):
"""
@ -74,3 +116,118 @@ class KNNAttackStrategyUtils(AttackStrategyUtils):
else:
distances.append(dist_batch)
return np.concatenate(distances)
@staticmethod
def _column_statistical_test(df1_column_samples, df2_column_samples, column, is_categorical, is_numeric,
numeric_test_type, categorical_test_type, alpha, differing_columns):
if is_categorical:
test_type = categorical_test_type
if test_type == 'CHI':
try:
result = stats.chisquare(f_obs=df1_column_samples, f_exp=df1_column_samples)
except ValueError as e:
if str(e).startswith('For each axis slice, the sum of'):
print('Column', column, ' the observed and expected sums are not the same,'
'so cannot run distribution comparison test')
raise e
else:
raise
elif test_type == 'AD':
result = stats.anderson_ksamp([df1_column_samples, df2_column_samples], midrank=True)
elif test_type == 'ES':
result = stats.epps_singleton_2samp(df1_column_samples, df2_column_samples)
else:
raise ValueError('Unknown test type', test_type)
elif is_numeric:
test_type = numeric_test_type
if test_type == 'KS':
result = stats.ks_2samp(df1_column_samples, df2_column_samples)
elif test_type == 'CVM':
result = stats.cramervonmises_2samp(df1_column_samples, df1_column_samples)
elif test_type == 'AD':
result = stats.anderson_ksamp([df1_column_samples, df2_column_samples], midrank=True)
elif test_type == 'ES':
result = stats.epps_singleton_2samp(df1_column_samples, df2_column_samples)
else:
raise ValueError('Unknown test type', test_type)
else:
print(f'Skipping non-numeric and non-categorical column {column}')
return
print(
f"{column}: {test_type} = {result.statistic:.4f} "
f"(p-value = {result.pvalue:.3e}, are equal = {result.pvalue > 0.05})")
if result.pvalue < alpha:
# Reject H0, different distributions
print(f"Distributions differ in column {column}, p-value: {result.pvalue}")
differing_columns.append(column)
else:
# Accept H0, similar distributions
print(f'Accept H0, similar distributions in column {column}')
def _columns_different_distributions(self, df1: ArrayDataset, df2: ArrayDataset,
categorical_features: list = []) -> list:
differing_columns = []
df1_samples = df1.get_samples()
df2_samples = df2.get_samples()
if df1.is_pandas:
for name, _ in df1_samples.items():
is_categorical = name in categorical_features or is_categorical_dtype(df1_samples.dtypes[name])
is_numeric = is_numeric_dtype(df1_samples.dtypes[name])
KNNAttackStrategyUtils._column_statistical_test(df1_samples[name], df2_samples[name], name,
is_categorical, is_numeric,
self.distribution_comparison_numeric_test,
self.distribution_comparison_categorical_test,
self.distribution_comparison_alpha,
differing_columns)
else:
is_numeric = np.issubdtype(df1_samples.dtype, int) or np.issubdtype(df1_samples.dtype, float)
for i, column in enumerate(df1_samples.T):
is_categorical = i in categorical_features
KNNAttackStrategyUtils._column_statistical_test(df1_samples[:, i], df2_samples[:, i], i,
is_categorical, is_numeric,
self.distribution_comparison_numeric_test,
self.distribution_comparison_categorical_test,
self.distribution_comparison_alpha, differing_columns)
return differing_columns
def validate_distributions(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, categorical_features: list = None):
"""
Validate column distributions are similar between the datasets.
One advantage of the ES test compared to the KS test is that is does not assume a continuous distribution.
In [1], the authors conclude that the test also has a higher power than the KS test in many examples. They
recommend the use of the ES test for discrete samples as well as continuous samples with at least 25
observations each, whereas AD is recommended for smaller sample sizes in the continuous case.
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param categorical_features: a list of categorical features of the datasets
:return:
DistributionValidationResult
"""
try:
member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_members,
categorical_features)
non_member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_non_members,
categorical_features)
except (ValueError, np.linalg.LinAlgError) as e:
print("Failed to validate distributions", e)
return DistributionValidationResult(distributions_validated=True,
distributions_valid=False,
member_column_distribution_diff=[],
non_member_column_distribution_diff=[])
if not member_column_distribution_diff and not non_member_column_distribution_diff:
return DistributionValidationResult(distributions_validated=True,
distributions_valid=True,
member_column_distribution_diff=[],
non_member_column_distribution_diff=[])
return DistributionValidationResult(distributions_validated=True,
distributions_valid=False,
member_column_distribution_diff=member_column_distribution_diff,
non_member_column_distribution_diff=non_member_column_distribution_diff)

View file

@ -1,5 +1,7 @@
from __future__ import annotations
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Optional
@ -11,34 +13,39 @@ from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, D
from apt.risk.data_assessment.dataset_attack_whole_dataset_knn_distance import \
DatasetAttackConfigWholeDatasetKnnDistance, DatasetAttackWholeDatasetKnnDistance
from apt.utils.datasets import ArrayDataset
from apt.risk.data_assessment.dataset_attack_membership_classification import \
DatasetAttackConfigMembershipClassification, DatasetAttackMembershipClassification
@dataclass
class DatasetAssessmentManagerConfig:
"""
Configuration for DatasetAssessmentManager.
:param persist_reports: Whether to save assessment results to filesystem.
:param generate_plots: Whether to generate and visualize plots as part of assessment.
:param persist_reports: save assessment results to filesystem, or not.
:param timestamp_reports: if persist_reports is True, then define if create a separate report for each timestamp,
or append to the same reports
:param generate_plots: generate and visualize plots as part of assessment, or not..
"""
persist_reports: bool = False
timestamp_reports: bool = False
generate_plots: bool = False
class DatasetAssessmentManager:
"""
The main class for running dataset assessment attacks.
:param config: Configuration parameters to guide the dataset assessment process
"""
attack_scores_per_record_knn_probabilities: list[DatasetAttackScore] = []
attack_scores_whole_dataset_knn_distance: list[DatasetAttackScore] = []
attack_scores = defaultdict(list)
def __init__(self, config: Optional[DatasetAssessmentManagerConfig] = DatasetAssessmentManagerConfig) -> None:
"""
:param config: Configuration parameters to guide the dataset assessment process
"""
self.config = config
def assess(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str = DEFAULT_DATASET_NAME) -> list[DatasetAttackScore]:
synthetic_data: ArrayDataset, dataset_name: str = DEFAULT_DATASET_NAME, categorical_features: list = [])\
-> list[DatasetAttackScore]:
"""
Do dataset privacy risk assessment by running dataset attacks, and return their scores.
@ -48,41 +55,54 @@ class DatasetAssessmentManager:
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param dataset_name: A name to identify this dataset, optional
:param categorical_features: A list of categorical feature names or numbers
:return:
a list of dataset attack risk scores
"""
# Create attacks
config_gl = DatasetAttackConfigMembershipKnnProbabilities(use_batches=False,
generate_plot=self.config.generate_plots)
attack_gl = DatasetAttackMembershipKnnProbabilities(original_data_members,
original_data_non_members,
synthetic_data,
config_gl,
dataset_name)
score_gl = attack_gl.assess_privacy()
self.attack_scores_per_record_knn_probabilities.append(score_gl)
dataset_name, categorical_features)
config_h = DatasetAttackConfigWholeDatasetKnnDistance(use_batches=False)
attack_h = DatasetAttackWholeDatasetKnnDistance(original_data_members, original_data_non_members,
synthetic_data, config_h, dataset_name)
synthetic_data, config_h, dataset_name, categorical_features)
score_h = attack_h.assess_privacy()
self.attack_scores_whole_dataset_knn_distance.append(score_h)
return [score_gl, score_h]
config_mc = DatasetAttackConfigMembershipClassification(classifier_type='LogisticRegression',
# 'RandomForestClassifier',
threshold=0.9)
attack_mc = DatasetAttackMembershipClassification(original_data_members, original_data_non_members,
synthetic_data, config_mc, dataset_name)
attack_list = [
(attack_gl, attack_gl.short_name()), # "MembershipKnnProbabilities"
(attack_h, attack_h.short_name()), # "WholeDatasetKnnDistance"
(attack_mc, attack_mc.short_name()), # "MembershipClassification"
]
for i, (attack, attack_name) in enumerate(attack_list):
print(f"Running {attack_name} attack on {dataset_name}")
score = attack.assess_privacy()
self.attack_scores[attack_name].append(score)
return self.attack_scores
def dump_all_scores_to_files(self):
"""
Save assessment results to filesystem.
"""
Save assessment results to filesystem.
"""
if self.config.persist_reports:
results_log_file = "_results.log.csv"
self._dump_scores_to_file(self.attack_scores_per_record_knn_probabilities,
"per_record_knn_probabilities" + results_log_file, True)
self._dump_scores_to_file(self.attack_scores_whole_dataset_knn_distance,
"whole_dataset_knn_distance" + results_log_file, True)
@staticmethod
def _dump_scores_to_file(attack_scores: list[DatasetAttackScore], filename: str, header: bool):
run_results_df = pd.DataFrame(attack_scores).drop('result', axis=1, errors='ignore') # don't serialize result
run_results_df.to_csv(filename, header=header, encoding='utf-8', index=False, mode='w') # Overwrite
time_str = time.strftime("%Y%m%d-%H%M%S")
for i, (attack_name, attack_scores) in enumerate(self.attack_scores.items()):
if self.config.timestamp_reports:
results_log_file = f"{time_str}_{attack_name}_results.log.csv"
else:
results_log_file = f"{attack_name}_results.log.csv"
run_results_df = (pd.DataFrame(attack_scores).drop('result', axis=1, errors='ignore').
drop('distributions_validation_result', axis=1, errors='ignore'))
run_results_df.to_csv(results_log_file, header=True, encoding='utf-8', index=False, mode='w')

View file

@ -16,59 +16,68 @@ from apt.utils.datasets import ArrayDataset
class Config(abc.ABC):
"""
The base class for dataset attack configurations
The base class for dataset attack configurations
"""
pass
class DatasetAttack(abc.ABC):
"""
The interface for performing privacy attack for risk assessment of synthetic datasets to be used in AI model
training. The original data members (training data) and non-members (the holdout data) should be available.
For reliability, all the datasets should be preprocessed and normalized.
:param original_data_members: A container for the training original samples and labels,
only samples are used in the assessment
:param original_data_non_members: A container for the holdout original samples and labels,
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param config: Configuration parameters to guide the assessment process
:param dataset_name: A name to identify the dataset under attack, optional
:param attack_strategy_utils: Utils for use with the attack strategy, optional
The interface for performing privacy attack for risk assessment of synthetic datasets to be used in AI model
training. The original data members (training data) and non-members (the holdout data) should be available.
For reliability, all the datasets should be preprocessed and normalized.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, config: Config, dataset_name: str,
categorical_features: list = [],
attack_strategy_utils: Optional[AttackStrategyUtils] = None) -> None:
"""
:param original_data_members: A container for the training original samples and labels,
only samples are used in the assessment
:param original_data_non_members: A container for the holdout original samples and labels,
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param config: Configuration parameters to guide the assessment process
:param dataset_name: A name to identify the dataset under attack, optional
:param categorical_features: The list of categorical features (column names for pandas and column indexes for
numpy), optional
:param attack_strategy_utils: Utils for use with the attack strategy, optional
"""
self.original_data_members = original_data_members
self.original_data_non_members = original_data_non_members
self.synthetic_data = synthetic_data
self.config = config
self.attack_strategy_utils = attack_strategy_utils
self.dataset_name = dataset_name
self.categorical_features = categorical_features
self.attack_strategy_utils = attack_strategy_utils
@abc.abstractmethod
def assess_privacy(self) -> DatasetAttackScore:
"""
Assess the privacy of the dataset.
Assess the privacy of the dataset
:return:
score: DatasetAttackScore the privacy attack risk score
"""
pass
@property
@abc.abstractmethod
def short_name(self):
pass
class DatasetAttackMembership(DatasetAttack):
"""
An abstract base class for performing privacy risk assessment for synthetic datasets on a per-record level.
An abstract base class for performing privacy risk assessment for synthetic datasets on a per-record level.
"""
@abc.abstractmethod
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultMembership,
generate_plot: bool = False) -> DatasetAttackScore:
"""
Calculate dataset privacy score based on the result of the privacy attack.
Calculate dataset privacy score based on the result of the privacy attack
:return:
score: DatasetAttackScore
"""
@ -78,12 +87,11 @@ class DatasetAttackMembership(DatasetAttack):
def plot_roc_curve(dataset_name: str, member_probabilities: np.ndarray, non_member_probabilities: np.ndarray,
filename_prefix: str = ""):
"""
Plot ROC curve.
:param dataset_name: dataset name, will become part of the plot filename.
:param member_probabilities: probability estimates of the member samples, the training data.
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data.
:param filename_prefix: name prefix for the ROC curve plot.
Plot ROC curve
:param dataset_name: dataset name, will become part of the plot filename
:param member_probabilities: probability estimates of the member samples, the training data
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data
:param filename_prefix: name prefix for the ROC curve plot
"""
labels = np.concatenate((np.zeros((len(non_member_probabilities),)), np.ones((len(member_probabilities),))))
results = np.concatenate((non_member_probabilities, member_probabilities))
@ -95,10 +103,9 @@ class DatasetAttackMembership(DatasetAttack):
@staticmethod
def calculate_metrics(member_probabilities: np.ndarray, non_member_probabilities: np.ndarray):
"""
Calculate attack performance metrics.
:param member_probabilities: probability estimates of the member samples, the training data.
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data.
Calculate attack performance metrics
:param member_probabilities: probability estimates of the member samples, the training data
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data
:return:
fpr: False Positive rate
tpr: True Positive rate

View file

@ -0,0 +1,161 @@
from dataclasses import dataclass
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from apt.risk.data_assessment.dataset_attack import DatasetAttackMembership, Config
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DEFAULT_DATASET_NAME
from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAttackConfigMembershipClassification(Config):
"""Configuration for DatasetAttackMembershipClassification.
Attributes:
classifier_type: sklearn classifier type for the member classification.
Can be LogisticRegression or RandomForestClassifier
threshold: a minimum threshold of distinguishability, above which a synthetic_data_quality_warning is raised.
A value higher than the threshold means that it is too easy to distinguish between the synthetic
data and the training or test data.
"""
classifier_type: str = 'RandomForestClassifier'
threshold: float = 0.9
@dataclass
class DatasetAttackScoreMembershipClassification(DatasetAttackScore):
"""DatasetAttackMembershipClassification privacy risk score.
"""
member_roc_auc_score: float
non_member_roc_auc_score: float
normalized_ratio: float
synthetic_data_quality_warning: bool
assessment_type: str = 'MembershipClassification' # to be used in reports
def __init__(self, dataset_name: str, member_roc_auc_score: float, non_member_roc_auc_score: float,
normalized_ratio: float, synthetic_data_quality_warning: bool) -> None:
"""
dataset_name: dataset name to be used in reports
member_roc_auc_score: ROC AUC score of classification between members (training) data and synthetic data
non_member_roc_auc_score: ROC AUC score of classification between non-members (test) data and synthetic data,
this is the baseline score
normalized_ratio: ratio of the member_roc_auc_score to the non_member_roc_auc_score
synthetic_data_quality_warning: True if either the member_roc_auc_score or the non_member_roc_auc_score is
higher than the threshold. That means that the synthetic data does not represent
the training data sufficiently well, or that the test data is too far from the
synthetic data.
"""
super().__init__(dataset_name=dataset_name, risk_score=normalized_ratio, result=None)
self.member_roc_auc_score = member_roc_auc_score
self.non_member_roc_auc_score = non_member_roc_auc_score
self.normalized_ratio = normalized_ratio
self.synthetic_data_quality_warning = synthetic_data_quality_warning
class DatasetAttackMembershipClassification(DatasetAttackMembership):
"""
Privacy risk assessment for synthetic datasets that compares the distinguishability of the synthetic dataset
from the members dataset (training) as opposed to the distinguishability of the synthetic dataset from the
non-members dataset (test).
The privacy risk measure is calculated as the ratio of the receiver operating characteristic curve (AUC ROC) of
the members dataset to AUC ROC of the non-members dataset. It can be 0.0 or higher, with higher scores meaning
higher privacy risk and worse privacy.
"""
SHORT_NAME = 'MembershipClassification'
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigMembershipClassification = DatasetAttackConfigMembershipClassification(),
dataset_name: str = DEFAULT_DATASET_NAME, categorical_features: list = None):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the attack, optional
:param dataset_name: A name to identify this dataset, optional
"""
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
categorical_features)
self.member_classifier = self._get_classifier(config.classifier_type)
self.non_member_classifier = self._get_classifier(config.classifier_type)
self.threshold = config.threshold
def short_name(self):
return self.SHORT_NAME
@staticmethod
def _get_classifier(classifier_type):
if classifier_type == 'LogisticRegression':
classifier = LogisticRegression()
elif classifier_type == 'RandomForestClassifier':
classifier = RandomForestClassifier(max_depth=2, random_state=0)
else:
raise ValueError('Incorrect classifier type', classifier_type)
return classifier
def assess_privacy(self) -> DatasetAttackScoreMembershipClassification:
"""
Calculate the ratio of the receiver operating characteristic curve (AUC ROC) of the distinguishability of the
synthetic data from the members dataset to AU ROC of the distinguishability of the synthetic data from the
non-members dataset.
:return: the ratio as the privacy risk measure
"""
member_roc_auc = self._classify_datasets(
self.original_data_members, self.synthetic_data, self.member_classifier)
non_member_roc_auc = self._classify_datasets(
self.original_data_non_members, self.synthetic_data, self.non_member_classifier)
score = self.calculate_privacy_score(member_roc_auc, non_member_roc_auc)
return score
def _classify_datasets(self, df1: ArrayDataset, df2: ArrayDataset, classifier):
"""
Split df1 and df2 into train and test parts, fit the classifier to distinguish between df1 train and
df2 train, and then check how good the classification is on the df1 test and df2 test parts.
:return: ROC AUC score of the classification between df1 test and df2 test
"""
df1_train, df1_test = train_test_split(df1.get_samples(), test_size=0.5, random_state=42)
df2_train, df2_test = train_test_split(df2.get_samples(), test_size=0.5, random_state=42)
train_x = np.concatenate([df1_train, df2_train])
train_labels = np.concatenate((np.ones_like(df1_train[:, 0], dtype='int'),
np.zeros_like(df2_train[:, 0], dtype='int')))
classifier.fit(train_x, train_labels)
test_x = np.concatenate([df1_test, df2_test])
test_labels = np.concatenate((np.ones_like(df1_test[:, 0], dtype='int'),
np.zeros_like(df2_test[:, 0], dtype='int')))
print('Model accuracy: ', classifier.score(test_x, test_labels))
predict_proba = classifier.predict_proba(test_x)
return roc_auc_score(test_labels, predict_proba[:, 1])
def calculate_privacy_score(self, member_roc_auc: float, non_member_roc_auc: float) -> (
DatasetAttackScoreMembershipClassification):
"""
Compare the distinguishability of the synthetic data from the members dataset (training)
with the distinguishability of the synthetic data from the non-members dataset (test).
:return:
"""
score, baseline_score = member_roc_auc, non_member_roc_auc
if 0 < baseline_score <= score:
normalized_ratio = score / baseline_score - 1.0
else:
normalized_ratio = 0
if (score >= self.threshold) or (baseline_score >= self.threshold):
synthetic_data_quality_warning = True
else:
synthetic_data_quality_warning = False
score = DatasetAttackScoreMembershipClassification(
self.dataset_name, member_roc_auc_score=score, non_member_roc_auc_score=baseline_score,
normalized_ratio=normalized_ratio, synthetic_data_quality_warning=synthetic_data_quality_warning)
return score

View file

@ -10,7 +10,7 @@ from typing import Callable
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils, DistributionValidationResult
from apt.risk.data_assessment.dataset_attack import DatasetAttackMembership, Config
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DatasetAttackResultMembership, \
DEFAULT_DATASET_NAME
@ -19,18 +19,22 @@ from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAttackConfigMembershipKnnProbabilities(Config):
"""
Configuration for DatasetAttackMembershipKnnProbabilities.
"""Configuration for DatasetAttackMembershipKnnProbabilities.
:param k: Number of nearest neighbors to search.
:param use_batches: Divide query samples into batches or not.
:param batch_size: Query sample batch size.
:param compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must
return one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
:param distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
:param generate_plot: Generate or not an AUR ROC curve and persist it in a file.
Attributes:
k: Number of nearest neighbors to search
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
generate_plot: Generate or not an AUR ROC curve and persist it in a file
distribution_comparison_alpha: the significance level of the statistical distribution test p-value.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution, and we claim that the
distributions are different.
"""
k: int = 5
use_batches: bool = False
@ -38,25 +42,27 @@ class DatasetAttackConfigMembershipKnnProbabilities(Config):
compute_distance: Callable = None
distance_params: dict = None
generate_plot: bool = False
distribution_comparison_alpha: float = 0.05
@dataclass
class DatasetAttackScoreMembershipKnnProbabilities(DatasetAttackScore):
"""
DatasetAttackMembershipKnnProbabilities privacy risk score.
:param dataset_name: dataset name to be used in reports
:param roc_auc_score: the area under the receiver operating characteristic curve (AUC ROC) to evaluate the
attack performance.
:param average_precision_score: the proportion of predicted members that are correctly members.
:param result: the result of the membership inference attack.
"""DatasetAttackMembershipKnnProbabilities privacy risk score.
"""
roc_auc_score: float
average_precision_score: float
distributions_validation_result: DistributionValidationResult
assessment_type: str = 'MembershipKnnProbabilities' # to be used in reports
def __init__(self, dataset_name: str, roc_auc_score: float, average_precision_score: float,
result: DatasetAttackResultMembership) -> None:
"""
dataset_name: dataset name to be used in reports
roc_auc_score: the area under the receiver operating characteristic curve (AUC ROC) to evaluate the attack
performance.
average_precision_score: the proportion of predicted members that are correctly members
result: the result of the membership inference attack
"""
super().__init__(dataset_name=dataset_name, risk_score=roc_auc_score, result=result)
self.roc_auc_score = roc_auc_score
self.average_precision_score = average_precision_score
@ -64,32 +70,39 @@ class DatasetAttackScoreMembershipKnnProbabilities(DatasetAttackScore):
class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
"""
Privacy risk assessment for synthetic datasets based on Black-Box MIA attack using distances of
members (training set) and non-members (holdout set) from their nearest neighbors in the synthetic dataset.
By default, the Euclidean distance is used (L2 norm), but another ``compute_distance()`` method can be provided
in configuration instead.
The area under the receiver operating characteristic curve (AUC ROC) gives the privacy risk measure.
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the attack, optional
:param dataset_name: A name to identify this dataset, optional
Privacy risk assessment for synthetic datasets based on Black-Box MIA attack using distances of
members (training set) and non-members (holdout set) from their nearest neighbors in the synthetic dataset.
By default, the Euclidean distance is used (L2 norm), but another ``compute_distance()`` method can be provided
in configuration instead.
The area under the receiver operating characteristic curve (AUC ROC) gives the privacy risk measure.
"""
SHORT_NAME = 'MembershipKnnProbabilities'
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigMembershipKnnProbabilities = DatasetAttackConfigMembershipKnnProbabilities(),
dataset_name: str = DEFAULT_DATASET_NAME):
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
dataset_name: str = DEFAULT_DATASET_NAME,
categorical_features: list = None, **kwargs):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the attack, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size,
config.distribution_comparison_alpha, **kwargs)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
attack_strategy_utils)
categorical_features, attack_strategy_utils)
if config.compute_distance:
self.knn_learner = NearestNeighbors(n_neighbors=config.k, algorithm='auto', metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.knn_learner = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
def short_name(self):
return self.SHORT_NAME
def assess_privacy(self) -> DatasetAttackScoreMembershipKnnProbabilities:
"""
Membership Inference Attack which calculates probabilities of member and non-member samples to be generated by
@ -101,26 +114,35 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
it is more likely that the query sample was used to train the generative model. This probability is approximated
by the Parzen window density estimation in ``probability_per_sample()``, computed from the NN distances from the
query samples to the synthetic data samples.
Before running the assessment, there is a validation that the distribution of the synthetic data is similar to
that of the original data members and to that of the original data non-members.
:return: Privacy score of the attack together with the attack result with the probabilities of member and
non-member samples to be generated by the synthetic data generator based on the NN distances from the
query samples to the synthetic data samples
:return:
Privacy score of the attack together with the attack result with the probabilities of member and
non-member samples to be generated by the synthetic data generator based on the NN distances from the
query samples to the synthetic data samples
The result also contains the distribution validation result and a warning if the distributions are not
similar.
"""
distributions_validation_result = self.attack_strategy_utils.validate_distributions(
self.original_data_members, self.original_data_non_members, self.synthetic_data, self.categorical_features)
# nearest neighbor search
self.attack_strategy_utils.fit(self.knn_learner, self.synthetic_data)
# members query
member_proba = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_members,
self.probability_per_sample)
member_distances = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_members)
# non-members query
non_member_proba = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_non_members,
self.probability_per_sample)
non_member_distances = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_non_members)
member_proba = self.probability_per_sample(member_distances)
non_member_proba = self.probability_per_sample(non_member_distances)
result = DatasetAttackResultMembership(member_probabilities=member_proba,
non_member_probabilities=non_member_proba)
score = self.calculate_privacy_score(result, self.config.generate_plot)
score.distributions_validation_result = distributions_validation_result
return score
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultMembership,
@ -128,11 +150,11 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
"""
Evaluate privacy score from the probabilities of member and non-member samples to be generated by the synthetic
data generator. The probabilities are computed by the ``assess_privacy()`` method.
:param dataset_attack_result: attack result containing probabilities of member and non-member samples to be
generated by the synthetic data generator.
:param generate_plot: generate AUC ROC curve plot and persist it.
:return: score of the attack, based on distance-based probabilities - mainly the ROC AUC score.
:param dataset_attack_result attack result containing probabilities of member and non-member samples to be
generated by the synthetic data generator
:param generate_plot generate AUC ROC curve plot and persist it
:return:
score of the attack, based on distance-based probabilities - mainly the ROC AUC score
"""
member_proba, non_member_proba = \
dataset_attack_result.member_probabilities, dataset_attack_result.non_member_probabilities
@ -149,10 +171,10 @@ class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
"""
For every sample represented by its distance from the query sample to its KNN in synthetic data,
computes the probability of the synthetic data to be part of the query dataset.
:param distances: distance between every query sample in batch to its KNNs among synthetic samples, a numpy
array of size (n, k) with n being the number of samples, k - the number of KNNs.
:return: probability estimates of the query samples being generated and so - of being part of the synthetic set,
a numpy array of size (n,)
array of size (n, k) with n being the number of samples, k - the number of KNNs
:return:
probability estimates of the query samples being generated and so - of being part of the synthetic set, a
numpy array of size (n,)
"""
return np.average(np.exp(-distances), axis=1)

View file

@ -10,7 +10,7 @@ from dataclasses import dataclass
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils, DistributionValidationResult
from apt.risk.data_assessment.dataset_attack import Config, DatasetAttack
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DEFAULT_DATASET_NAME
from apt.utils.datasets import ArrayDataset
@ -20,62 +20,73 @@ K = 1 # Number of nearest neighbors to search. For DCR we need only the nearest
@dataclass
class DatasetAttackConfigWholeDatasetKnnDistance(Config):
"""
Configuration for DatasetAttackWholeDatasetKnnDistance.
"""Configuration for DatasetAttackWholeDatasetKnnDistance.
:param use_batches: Divide query samples into batches or not.
:param batch_size: Query sample batch size.
:param compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must
return one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
:param distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
Attributes:
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
distribution_comparison_alpha: the significance level of the statistical distribution test p-value.
If p-value is less than alpha, then we reject the null hypothesis that the
observed samples are drawn from the same distribution, and we claim that the
distributions are different.
"""
use_batches: bool = False
batch_size: int = 10
compute_distance: callable = None
distance_params: dict = None
distribution_comparison_alpha: float = 0.05
distribution_comparison_numeric_test: str = 'KS',
distribution_comparison_categorical_test: str = 'CHI'
@dataclass
class DatasetAttackScoreWholeDatasetKnnDistance(DatasetAttackScore):
"""
DatasetAttackWholeDatasetKnnDistance privacy risk score.
:param dataset_name: Dataset name to be used in reports.
:param share: The share of synthetic records closer to the training than the holdout dataset.
A value of 0.5 or close to it means good privacy.
"""DatasetAttackWholeDatasetKnnDistance privacy risk score.
"""
share: float
distributions_validation_result: DistributionValidationResult
assessment_type: str = 'WholeDatasetKnnDistance' # to be used in reports
def __init__(self, dataset_name: str, share: float) -> None:
"""
dataset_name: dataset name to be used in reports
share : the share of synthetic records closer to the training than the holdout dataset.
A value of 0.5 or close to it means good privacy.
"""
super().__init__(dataset_name=dataset_name, risk_score=share, result=None)
self.share = share
class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
"""
Privacy risk assessment for synthetic datasets based on distances of synthetic data records from
members (training set) and non-members (holdout set). The privacy risk measure is the share of synthetic
records closer to the training than the holdout dataset.
By default, the Euclidean distance is used (L2 norm), but another compute_distance() method can be provided in
configuration instead.
:param original_data_members: A container for the training original samples and labels.
:param original_data_non_members: A container for the holdout original samples and labels.
:param synthetic_data: A container for the synthetic samples and labels.
:param config: Configuration parameters to guide the assessment process, optional.
:param dataset_name: A name to identify this dataset, optional.
Privacy risk assessment for synthetic datasets based on distances of synthetic data records from
members (training set) and non-members (holdout set). The privacy risk measure is the share of synthetic
records closer to the training than the holdout dataset.
By default, the Euclidean distance is used (L2 norm), but another compute_distance() method can be provided in
configuration instead.
"""
SHORT_NAME = 'WholeDatasetKnnDistance'
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigWholeDatasetKnnDistance = DatasetAttackConfigWholeDatasetKnnDistance(),
dataset_name: str = DEFAULT_DATASET_NAME):
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
dataset_name: str = DEFAULT_DATASET_NAME, categorical_features: list = None, **kwargs):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the assessment process, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size,
config.distribution_comparison_alpha, **kwargs)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
attack_strategy_utils)
categorical_features, attack_strategy_utils)
if config.compute_distance:
self.knn_learner_members = NearestNeighbors(n_neighbors=K, metric=config.compute_distance,
metric_params=config.distance_params)
@ -85,14 +96,23 @@ class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
self.knn_learner_members = NearestNeighbors(n_neighbors=K)
self.knn_learner_non_members = NearestNeighbors(n_neighbors=K)
def short_name(self):
return self.SHORT_NAME
def assess_privacy(self) -> DatasetAttackScoreWholeDatasetKnnDistance:
"""
Calculate the share of synthetic records closer to the training than the holdout dataset, based on the
DCR computed by 'calculate_distances()'.
Before running the assessment, there is a validation that the distribution of the synthetic data is similar to
that of the original data members and to that of the original data non-members.
:return:
score of the attack, based on the NN distances from the query samples to the synthetic data samples
score of the attack, based on the NN distances from the query samples to the synthetic data samples.
The result also contains the distribution validation result and a warning if the distributions are not
similar.
"""
distributions_validation_result = self.attack_strategy_utils.validate_distributions(
self.original_data_members, self.original_data_non_members, self.synthetic_data, self.categorical_features)
member_distances, non_member_distances = self.calculate_distances()
# distance of the synth. records to members and to non-members
assert (len(member_distances) == len(non_member_distances))
@ -104,6 +124,7 @@ class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
share = np.mean(member_distances < non_member_distances) + (n_members / (n_members + n_non_members)) * np.mean(
member_distances == non_member_distances)
score = DatasetAttackScoreWholeDatasetKnnDistance(self.dataset_name, share=share)
score.distributions_validation_result = distributions_validation_result
return score
def calculate_distances(self):

View file

@ -6,19 +6,22 @@ from sklearn.impute import SimpleImputer
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, FunctionTransformer
from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_diabetes_dataset_np, get_adult_dataset_pd, \
get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
from apt.risk.data_assessment.dataset_attack_membership_classification import DatasetAttackScoreMembershipClassification
from apt.risk.data_assessment.dataset_attack_membership_knn_probabilities import DatasetAttackScoreMembershipKnnProbabilities
from apt.risk.data_assessment.dataset_attack_whole_dataset_knn_distance import DatasetAttackScoreWholeDatasetKnnDistance
MIN_SHARE = 0.5
MIN_ROC_AUC = 0.0
MIN_PRECISION = 0.0
NUM_SYNTH_SAMPLES = 400
NUM_SYNTH_SAMPLES = 100
NUM_SYNTH_COMPONENTS = 4
iris_dataset_np = get_iris_dataset_np()
@ -30,13 +33,14 @@ mgr = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=Fa
def teardown_function():
print("dump_all_scores_to_files")
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)] \
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
anon_testdata = ([('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)]
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)])
@pytest.mark.parametrize("name, data, dataset_type, k, mgr", anon_testdata)
@ -49,14 +53,15 @@ def test_risk_anonymization(name, data, dataset_type, k, mgr):
preprocessed_x_test = x_test
QI = [0, 2]
anonymizer = Anonymize(k, QI, train_only_QI=True)
categorical_features = []
elif "adult" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_adult_x_data(x_train, x_test)
preprocessed_x_train, preprocessed_x_test, categorical_features = preprocess_adult_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI)
elif "nursery" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI, train_only_QI=True)
preprocessed_x_train, preprocessed_x_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 23))
anonymizer = Anonymize(k, QI, categorical_features=categorical_features, train_only_QI=True)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
@ -66,7 +71,7 @@ def test_risk_anonymization(name, data, dataset_type, k, mgr):
dataset_name = f'anon_k{k}_{name}'
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, anonymized_data,
dataset_name)
dataset_name, categorical_features)
testdata = [('iris_np', iris_dataset_np, 'np', mgr),
@ -83,11 +88,12 @@ def test_risk_kde(name, data, dataset_type, mgr):
encoded = x_train
encoded_test = x_test
num_synth_components = NUM_SYNTH_COMPONENTS
categorical_features = []
elif "adult" in name:
encoded, encoded_test = preprocess_adult_x_data(x_train, x_test)
encoded, encoded_test, categorical_features = preprocess_adult_x_data(x_train, x_test)
num_synth_components = 10
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
encoded, encoded_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
else:
raise ValueError('Pandas dataset missing a preprocessing step')
@ -98,7 +104,8 @@ def test_risk_kde(name, data, dataset_type, mgr):
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name)
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features)
def kde(n_samples, n_components, original_data):
@ -109,8 +116,8 @@ def kde(n_samples, n_components, original_data):
digit_data = original_data
pca = PCA(n_components=n_components, whiten=False)
data = pca.fit_transform(digit_data)
params = {'bandwidth': np.logspace(-1, 1, 10)}
grid = GridSearchCV(KernelDensity(), params, cv=2)
params = {'bandwidth': np.logspace(-1, 1, 20)}
grid = GridSearchCV(KernelDensity(), params, cv=5)
grid.fit(data)
kde_estimator = grid.best_estimator_
@ -125,10 +132,15 @@ def preprocess_adult_x_data(x_train, x_test):
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'native-country']
# prepare data for DT
def to_float(x):
return x.astype(float)
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0)),
('to_float', FunctionTransformer(to_float, feature_names_out='one-to-one'))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
@ -138,20 +150,18 @@ def preprocess_adult_x_data(x_train, x_test):
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test
preprocessor.fit(x_train)
encoded_test = preprocessor.transform(x_test)
return encoded, encoded_test, filter_categorical(preprocessor.get_feature_names_out(), return_feature_names=False)
def preprocess_nursery_x_data(x_train, x_test):
x_train = x_train.astype(str)
features = ["parents", "has_nurs", "form", "children", "housing", "finance", "social", "health"]
# QI = ["finance", "social", "health"]
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health"]
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
numeric_transformer = OrdinalEncoder(encoded_missing_value=-1)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
@ -160,14 +170,33 @@ def preprocess_nursery_x_data(x_train, x_test):
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test
preprocessor.fit(x_train)
encoded_test = preprocessor.transform(x_test)
return encoded, encoded_test, filter_categorical(preprocessor.get_feature_names_out(), return_feature_names=False)
def filter_categorical(feature_names, return_feature_names: bool = True):
feature_name_strs = feature_names.astype('U')
if return_feature_names:
return list(feature_names[np.char.startswith(feature_name_strs, 'cat__')])
else:
return list(np.flatnonzero(np.char.startswith(feature_name_strs, 'cat__')))
def assess_privacy_and_validate_result(dataset_assessment_manager, original_data_members, original_data_non_members,
synth_data, dataset_name):
[score_g, score_h] = dataset_assessment_manager.assess(original_data_members, original_data_non_members, synth_data,
dataset_name)
assert (score_g.roc_auc_score > MIN_ROC_AUC)
assert (score_g.average_precision_score > MIN_PRECISION)
assert (score_h.share > MIN_SHARE)
synth_data, dataset_name, categorical_features):
attack_scores = dataset_assessment_manager.assess(original_data_members, original_data_non_members, synth_data,
dataset_name, categorical_features)
for i, (assessment_type, scores) in enumerate(attack_scores.items()):
if assessment_type == 'MembershipKnnProbabilities':
score_g: DatasetAttackScoreMembershipKnnProbabilities = scores[0]
assert score_g.roc_auc_score > MIN_ROC_AUC
assert score_g.average_precision_score > MIN_PRECISION
elif assessment_type == 'WholeDatasetKnnDistance':
score_h: DatasetAttackScoreWholeDatasetKnnDistance = scores[0]
assert score_h.share > MIN_SHARE
if assessment_type == 'MembershipClassification':
score_mc: DatasetAttackScoreMembershipClassification = scores[0]
assert score_mc.synthetic_data_quality_warning is False
assert 0 <= score_mc.normalized_ratio <= 1

View file

@ -4,6 +4,11 @@ from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
from apt.risk.data_assessment.dataset_attack_membership_classification import DatasetAttackScoreMembershipClassification
from apt.risk.data_assessment.dataset_attack_membership_knn_probabilities import \
DatasetAttackScoreMembershipKnnProbabilities, DatasetAttackConfigMembershipKnnProbabilities, \
DatasetAttackMembershipKnnProbabilities
from apt.risk.data_assessment.dataset_attack_whole_dataset_knn_distance import DatasetAttackScoreWholeDatasetKnnDistance
from tests.test_data_assessment import kde, preprocess_nursery_x_data
NUM_SYNTH_SAMPLES = 10
@ -28,10 +33,10 @@ def teardown_function():
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', mgr1)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr2)] \
+ [('iris_np', iris_dataset_np, 'np', mgr3)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr4)]
anon_testdata = ([('iris_np', iris_dataset_np, 'np', mgr1)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr2)]
+ [('iris_np', iris_dataset_np, 'np', mgr3)]
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr4)])
@pytest.mark.parametrize("name, data, dataset_type, mgr", anon_testdata)
@ -44,9 +49,10 @@ def test_risk_anonymization(name, data, dataset_type, mgr):
preprocessed_x_test = x_test
QI = [0, 2]
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
categorical_features = []
elif "nursery" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
preprocessed_x_train, preprocessed_x_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 20))
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
@ -57,11 +63,12 @@ def test_risk_anonymization(name, data, dataset_type, mgr):
dataset_name = f'anon_k{ANON_K}_{name}'
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, anonymized_data,
dataset_name)
dataset_name, categorical_features)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=anonymized_data, dataset_name=None)
synth_data=anonymized_data, dataset_name=None,
categorical_features=categorical_features)
testdata = [('iris_np', iris_dataset_np, 'np', mgr4),
@ -72,38 +79,85 @@ testdata = [('iris_np', iris_dataset_np, 'np', mgr4),
@pytest.mark.parametrize("name, data, dataset_type, mgr", testdata)
def test_risk_kde(name, data, dataset_type, mgr):
original_data_members, original_data_non_members, synthetic_data, categorical_features \
= encode_and_generate_synthetic_data(dataset_type, name, data)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synthetic_data,
dataset_name, categorical_features)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=synthetic_data, dataset_name=None,
categorical_features=categorical_features)
testdata_knn_options = [('iris_np', iris_dataset_np, 'np'),
('nursery_pd', nursery_dataset_pd, 'pd')]
@pytest.mark.parametrize("name, data, dataset_type", testdata_knn_options)
def test_risk_kde_knn_options(name, data, dataset_type):
original_data_members, original_data_non_members, synthetic_data, categorical_features \
= encode_and_generate_synthetic_data(dataset_type, name, data)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
config_g = DatasetAttackConfigMembershipKnnProbabilities(use_batches=True, generate_plot=False,
distribution_comparison_alpha=0.1)
numeric_tests = ['KS', 'CVM', 'AD', 'ES']
categorical_tests = ['CHI', 'AD', 'ES']
for numeric_test in numeric_tests:
for categorical_test in categorical_tests:
attack_g = DatasetAttackMembershipKnnProbabilities(original_data_members,
original_data_non_members,
synthetic_data,
config_g,
dataset_name,
categorical_features,
distribution_comparison_numeric_test=numeric_test,
distribution_comparison_categorical_test=categorical_test
)
score_g = attack_g.assess_privacy()
assert score_g.roc_auc_score > MIN_ROC_AUC
assert score_g.average_precision_score > MIN_PRECISION
def encode_and_generate_synthetic_data(dataset_type, name, data):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
encoded = x_train
encoded_test = x_test
num_synth_components = NUM_SYNTH_COMPONENTS
categorical_features = []
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
encoded, encoded_test, categorical_features = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
else:
raise ValueError('Pandas dataset missing a preprocessing step')
synth_data = ArrayDataset(
synthetic_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=synth_data, dataset_name=None)
return original_data_members, original_data_non_members, synthetic_data, categorical_features
def assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data,
dataset_name):
if dataset_name:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data,
dataset_name)
else:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data)
assert (score_g.roc_auc_score > MIN_ROC_AUC)
assert (score_g.average_precision_score > MIN_PRECISION)
assert (score_h.share > MIN_SHARE)
def assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features):
attack_scores = mgr.assess(original_data_members, original_data_non_members, synth_data, dataset_name,
categorical_features)
for i, (assessment_type, scores) in enumerate(attack_scores.items()):
if assessment_type == 'MembershipKnnProbabilities':
score_g: DatasetAttackScoreMembershipKnnProbabilities = scores[0]
assert score_g.roc_auc_score > MIN_ROC_AUC
assert score_g.average_precision_score > MIN_PRECISION
elif assessment_type == 'WholeDatasetKnnDistance':
score_h: DatasetAttackScoreWholeDatasetKnnDistance = scores[0]
assert score_h.share > MIN_SHARE
if assessment_type == 'MembershipClassification':
score_mc: DatasetAttackScoreMembershipClassification = scores[0]
assert score_mc.synthetic_data_quality_warning is False
assert 0 <= score_mc.normalized_ratio <= 1