Merge pull request #71 from IBM/dataset_assessment

Add AI privacy Dataset assessment module with two attack implementations.

Signed-off-by: Maya Anderson <mayaa@il.ibm.com>
This commit is contained in:
Maya Anderson 2023-03-20 14:14:09 +02:00
parent c153635e4d
commit dbb958f791
13 changed files with 986 additions and 1 deletions

7
.gitignore vendored
View file

@ -51,6 +51,10 @@ coverage.xml
.pytest_cache/ .pytest_cache/
cover/ cover/
# Test results
*.csv
*.png
# Translations # Translations
*.mo *.mo
*.pot *.pot
@ -157,4 +161,5 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/ .idea/

View file

@ -38,3 +38,8 @@ A Python library for Machine Learning Security. Includes an attack module called
(membership inference, attribute inference, model inversion and database reconstruction) as well as a *privacy* metrics module that contains (membership inference, attribute inference, model inversion and database reconstruction) as well as a *privacy* metrics module that contains
membership leakage metrics for ML models. membership leakage metrics for ML models.
Citation
--------
Abigail Goldsteen, Ola Saadi, Ron Shmelkin, Shlomit Shachor, Natalia Razinkov,
"AI privacy toolkit", SoftwareX, Volume 22, 2023, 101352, ISSN 2352-7110, https://doi.org/10.1016/j.softx.2023.101352.

View file

@ -0,0 +1,105 @@
# Privacy Assessment of Datasets for AI Models
This module implements a tool for privacy assessment of synthetic datasets that are to be used in AI model training.
The main interface, ``DatasetAttack``, with the ``assess_privacy()`` main method assumes the availability of the
training data, holdout data and synthetic data at the time of the privacy evaluation.
It is to be implemented by concrete assessment methods, which can run the assessment on a per-record level,
or on the whole dataset.
The method ``assess_privacy()`` returns a ``DatasetAttackScore``, which contains a ``risk_score`` and,
optionally, a ``DatasetAttackResult``. Each specific attack can implement its own ``DatasetAttackScore``, which would
contain additional fields.
The abstract class ``DatasetAttackMembership`` implements the ``DatasetAttack`` interface, but adds the result
of the membership inference attack, so that the final score contains both the membership inference attack result
for further analysis and the calculated score.
``DatasetAssessmentManager`` provides convenience methods to run multiple attacks and persist the result reports.
Attack Implementations
-----------------------
One implementation is based on the paper "GAN-Leaks: A Taxonomy of Membership Inference Attacks against Generative
Models"[^1] and its implementation[^2]. It is based on Black-Box MIA attack using
distances of members (training set) and non-members (holdout set) from their nearest neighbors in the synthetic dataset.
By default, the Euclidean distance is used (L2 norm), but another ``compute_distance()`` method can be provided in
configuration instead.
The area under the receiver operating characteristic curve (AUC ROC) gives the privacy risk score.
Another implementation is based on the papers "Data Synthesis based on Generative Adversarial Networks"[^3] and
"Holdout-Based Fidelity and Privacy Assessment of Mixed-Type Synthetic Data"[^4], and on a variation of its reference
implementation[^5].
It is based on distances of synthetic data records from members (training set) and non-members (holdout set).
The privacy risk score is the share of synthetic records closer to the training than the holdout dataset.
By default, the Euclidean distance is used (L2 norm), but another ``compute_distance()`` method can be provided in
configuration instead.
Usage
-----
An implementation of the ``DatasetAttack`` interface is used for performing a privacy attack for risk assessment of
synthetic datasets to be used in AI model training.
The original data members (training data), non-members (the holdout data) and the synthetic data created from the
original members should be available.
For reliability, all the datasets should be preprocessed and normalized.
The following example runs all the attacks and persists the results in files, using ``DatasetAssessmentManager``.
It assumes that you provide it with the pairs ``(x_train, y_train)``, ``(x_test, y_test)`` and ``(x_synth, y_synth)``
for members, non-members and the synthetic datasets, respectively.
```python
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, \
DatasetAssessmentManagerConfig
from apt.utils.datasets import ArrayDataset
dataset_assessment_manager = DatasetAssessmentManager(
DatasetAssessmentManagerConfig(persist_reports=True, generate_plots=False))
synthetic_data = ArrayDataset(x_synth, y_synth)
original_data_members = ArrayDataset(x_train, y_train)
original_data_non_members = ArrayDataset(x_test, y_test)
dataset_name = 'my_dataset'
[score_gl, score_h] = dataset_assessment_manager.assess(
original_data_members, original_data_non_members, synthetic_data, dataset_name)
dataset_assessment_manager.dump_all_scores_to_files()
```
Alternatively, each attack can be run separately, for instance:
```python
from apt.risk.data_assessment.dataset_attack_membership_knn_probabilities import \
DatasetAttackConfigMembershipKnnProbabilities, DatasetAttackMembershipKnnProbabilities
from apt.utils.datasets import ArrayDataset
synthetic_data = ArrayDataset(x_synth, y_synth)
original_data_members = ArrayDataset(x_train, y_train)
original_data_non_members = ArrayDataset(x_test, y_test)
config_gl = DatasetAttackConfigMembershipKnnProbabilities(use_batches=False,
generate_plot=False)
attack_gl = DatasetAttackMembershipKnnProbabilities(original_data_members,
original_data_non_members,
synthetic_data,
config_gl)
score_gl = attack_gl.assess_privacy()
```
Citations
---------
[^1]: "GAN-Leaks: A Taxonomy of Membership Inference Attacks against Generative Models" by D. Chen, N. Yu, Y. Zhang,
M. Fritz in Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications Security, 34362, 2020.
[https://doi.org/10.1145/3372297.3417238](https://doi.org/10.1145/3372297.3417238)
[^2]: Code for the paper "GAN-Leaks: A Taxonomy of Membership Inference Attacks against Generative Models"
[https://github.com/DingfanChen/GAN-Leaks](https://github.com/DingfanChen/GAN-Leaks)
[^3]: "Data Synthesis based on Generative Adversarial Networks." by N. Park, M. Mohammadi, K. Gorde, S. Jajodia,
H. Park, and Y. Kim in International Conference on Very Large Data Bases (VLDB), 2018.
[^4]: "Holdout-Based Fidelity and Privacy Assessment of Mixed-Type Synthetic Data" by M. Platzer and T. Reutterer.
[^5]: Code for the paper "Holdout-Based Fidelity and Privacy Assessment of Mixed-Type Synthetic Data"
[https://github.com/mostly-ai/paper-fidelity-accuracy](https://github.com/mostly-ai/paper-fidelity-accuracy)

View file

@ -0,0 +1,12 @@
"""
Module providing privacy risk assessment for synthetic data.
The main interface, ``DatasetAttack``, with the ``assess_privacy()`` main method assumes the availability of the
training data, holdout data and synthetic data at the time of the privacy evaluation.
It is to be implemented by concrete assessment methods, which can run the assessment on a per-record level,
or on the whole dataset.
The abstract class ``DatasetAttackMembership`` implements the ``DatasetAttack`` interface, but adds the result
of the membership inference attack, so that the final score contains both the membership inference attack result
for further analysis and the calculated score.
"""
from apt.risk.data_assessment import dataset_attack

View file

@ -0,0 +1,70 @@
import abc
import numpy as np
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
from apt.utils.datasets import ArrayDataset
class AttackStrategyUtils(abc.ABC):
"""
Abstract base class for common utilities of various privacy attack strategies.
"""
pass
class KNNAttackStrategyUtils(AttackStrategyUtils):
"""
Common utilities for attack strategy based on KNN distances.
"""
def __init__(self, use_batches: bool = False, batch_size: int = 10) -> None:
"""
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set
:param batch_size: if use_batches=True, the size of batch_size should be > 0
"""
self.use_batches = use_batches
self.batch_size = batch_size
if use_batches:
if batch_size < 1:
raise ValueError(f"When using batching batch_size should be > 0, and not {batch_size}")
def fit(self, knn_learner: NearestNeighbors, dataset: ArrayDataset):
knn_learner.fit(dataset.get_samples())
def find_knn(self, knn_learner: NearestNeighbors, query_samples: ArrayDataset, distance_processor=None):
"""
Nearest neighbor search function.
:param query_samples: query samples, to which nearest neighbors are to be found
:param knn_learner: unsupervised learner for implementing neighbor searches, after it was fitted
:param distance_processor: function for processing the distance into another more relevant metric per sample.
Its input is an array representing distances (the distances returned by NearestNeighbors.kneighbors() ), and
the output should be another array with distance-based values that enable to compute the final risk score
:return:
distances of the query samples to their nearest neighbors, or a metric based on that distance and calculated
by the distance_processor function
"""
samples = query_samples.get_samples()
if not self.use_batches:
distances, _ = knn_learner.kneighbors(samples, return_distance=True)
if distance_processor:
return distance_processor(distances)
else:
return distances
distances = []
for i in tqdm(range(len(samples) // self.batch_size)):
x_batch = samples[i * self.batch_size:(i + 1) * self.batch_size]
x_batch = np.reshape(x_batch, [self.batch_size, -1])
# dist_batch: distance between every query sample in batch to its KNNs among training samples
dist_batch, _ = knn_learner.kneighbors(x_batch, return_distance=True)
# The probability of each sample to be generated
if distance_processor:
distance_based_metric_per_sample_batch = distance_processor(dist_batch)
distances.append(distance_based_metric_per_sample_batch)
else:
distances.append(dist_batch)
return np.concatenate(distances)

View file

@ -0,0 +1,80 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import pandas as pd
from apt.risk.data_assessment.dataset_attack_membership_knn_probabilities import \
DatasetAttackConfigMembershipKnnProbabilities, DatasetAttackMembershipKnnProbabilities
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DEFAULT_DATASET_NAME
from apt.risk.data_assessment.dataset_attack_whole_dataset_knn_distance import \
DatasetAttackConfigWholeDatasetKnnDistance, DatasetAttackWholeDatasetKnnDistance
from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAssessmentManagerConfig:
persist_reports: bool = False
generate_plots: bool = False
class DatasetAssessmentManager:
"""
The main class for running dataset assessment attacks.
"""
attack_scores_per_record_knn_probabilities: list[DatasetAttackScore] = []
attack_scores_whole_dataset_knn_distance: list[DatasetAttackScore] = []
def __init__(self, config: Optional[DatasetAssessmentManagerConfig] = DatasetAssessmentManagerConfig) -> None:
"""
:param config: Configuration parameters to guide the dataset assessment process
"""
self.config = config
def assess(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, dataset_name: str = DEFAULT_DATASET_NAME) -> list[DatasetAttackScore]:
"""
Do dataset privacy risk assessment by running dataset attacks, and return their scores.
:param original_data_members: A container for the training original samples and labels,
only samples are used in the assessment
:param original_data_non_members: A container for the holdout original samples and labels,
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param dataset_name: A name to identify this dataset, optional
:return:
a list of dataset attack risk scores
"""
config_gl = DatasetAttackConfigMembershipKnnProbabilities(use_batches=False,
generate_plot=self.config.generate_plots)
attack_gl = DatasetAttackMembershipKnnProbabilities(original_data_members,
original_data_non_members,
synthetic_data,
config_gl,
dataset_name)
score_gl = attack_gl.assess_privacy()
self.attack_scores_per_record_knn_probabilities.append(score_gl)
config_h = DatasetAttackConfigWholeDatasetKnnDistance(use_batches=False)
attack_h = DatasetAttackWholeDatasetKnnDistance(original_data_members, original_data_non_members,
synthetic_data, config_h, dataset_name)
score_h = attack_h.assess_privacy()
self.attack_scores_whole_dataset_knn_distance.append(score_h)
return [score_gl, score_h]
def dump_all_scores_to_files(self):
if self.config.persist_reports:
results_log_file = "_results.log.csv"
self.dump_scores_to_file(self.attack_scores_per_record_knn_probabilities,
"per_record_knn_probabilities" + results_log_file, True)
self.dump_scores_to_file(self.attack_scores_whole_dataset_knn_distance,
"whole_dataset_knn_distance" + results_log_file, True)
@staticmethod
def dump_scores_to_file(attack_scores: list[DatasetAttackScore], filename: str, header: bool):
run_results_df = pd.DataFrame(attack_scores).drop('result', axis=1, errors='ignore') # don't serialize result
run_results_df.to_csv(filename, header=header, encoding='utf-8', index=False, mode='w') # Overwrite

View file

@ -0,0 +1,113 @@
"""
This module defines the interface for privacy risk assessment of synthetic datasets.
"""
import abc
from typing import Optional
import matplotlib.pyplot as plt
import numpy as np
from sklearn import metrics
from sklearn.metrics import RocCurveDisplay
from apt.risk.data_assessment.attack_strategy_utils import AttackStrategyUtils
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DatasetAttackResultMembership
from apt.utils.datasets import ArrayDataset
class Config(abc.ABC):
"""
The base class for dataset attack configurations
"""
pass
class DatasetAttack(abc.ABC):
"""
The interface for performing privacy attack for risk assessment of synthetic datasets to be used in AI model
training. The original data members (training data) and non-members (the holdout data) should be available.
For reliability, all the datasets should be preprocessed and normalized.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, config: Config, dataset_name: str,
attack_strategy_utils: Optional[AttackStrategyUtils] = None) -> None:
"""
:param original_data_members: A container for the training original samples and labels,
only samples are used in the assessment
:param original_data_non_members: A container for the holdout original samples and labels,
only samples are used in the assessment
:param synthetic_data: A container for the synthetic samples and labels, only samples are used in the assessment
:param config: Configuration parameters to guide the assessment process
:param dataset_name: A name to identify the dataset under attack, optional
:param attack_strategy_utils: Utils for use with the attack strategy, optional
"""
self.original_data_members = original_data_members
self.original_data_non_members = original_data_non_members
self.synthetic_data = synthetic_data
self.config = config
self.attack_strategy_utils = attack_strategy_utils
self.dataset_name = dataset_name
@abc.abstractmethod
def assess_privacy(self) -> DatasetAttackScore:
"""
Assess the privacy of the dataset
:return:
score: DatasetAttackScore the privacy attack risk score
"""
pass
class DatasetAttackMembership(DatasetAttack):
"""
An abstract base class for performing privacy risk assessment for synthetic datasets on a per-record level.
"""
@abc.abstractmethod
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultMembership,
generate_plot: bool = False) -> DatasetAttackScore:
"""
Calculate dataset privacy score based on the result of the privacy attack
:return:
score: DatasetAttackScore
"""
pass
@staticmethod
def plot_roc_curve(dataset_name: str, member_probabilities: np.ndarray, non_member_probabilities: np.ndarray,
filename_prefix: str = ""):
"""
Plot ROC curve
:param dataset_name: dataset name, will become part of the plot filename
:param member_probabilities: probability estimates of the member samples, the training data
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data
:param filename_prefix: name prefix for the ROC curve plot
"""
labels = np.concatenate((np.zeros((len(non_member_probabilities),)), np.ones((len(member_probabilities),))))
results = np.concatenate((non_member_probabilities, member_probabilities))
svc_disp = RocCurveDisplay.from_predictions(labels, results)
svc_disp.plot()
plt.plot([0, 1], [0, 1], color="navy", linewidth=2, linestyle="--", label='No skills')
plt.title('ROC curve')
plt.savefig(f'{filename_prefix}{dataset_name}_roc_curve.png')
@staticmethod
def calculate_metrics(member_probabilities: np.ndarray, non_member_probabilities: np.ndarray):
"""
Calculate attack performance metrics
:param member_probabilities: probability estimates of the member samples, the training data
:param non_member_probabilities: probability estimates of the non-member samples, the hold-out data
:return:
fpr: False Positive rate
tpr: True Positive rate
threshold: threshold
auc: area under the Receiver Operating Characteristic Curve
ap: average precision score
"""
labels = np.concatenate((np.zeros((len(non_member_probabilities),)), np.ones((len(member_probabilities)))))
results = np.concatenate((non_member_probabilities, member_probabilities))
fpr, tpr, threshold = metrics.roc_curve(labels, results, pos_label=1)
auc = metrics.roc_auc_score(labels, results)
ap = metrics.average_precision_score(labels, results)
return fpr, tpr, threshold, auc, ap

View file

@ -0,0 +1,160 @@
"""
This module implements privacy risk assessment of synthetic datasets based on the paper:
"GAN-Leaks: A Taxonomy of Membership Inference Attacks against Generative Models" by D. Chen, N. Yu, Y. Zhang, M. Fritz
published in Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications Security, 34362, 2020.
https://doi.org/10.1145/3372297.3417238 and its implementation in https://github.com/DingfanChen/GAN-Leaks.
"""
from dataclasses import dataclass
from typing import Callable
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.dataset_attack import DatasetAttackMembership, Config
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DatasetAttackResultMembership, \
DEFAULT_DATASET_NAME
from apt.utils.datasets import ArrayDataset
@dataclass
class DatasetAttackConfigMembershipKnnProbabilities(Config):
"""Configuration for DatasetAttackMembershipKnnProbabilities.
Attributes:
k: Number of nearest neighbors to search
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
generate_plot: Generate or not an AUR ROC curve and persist it in a file
"""
k: int = 5
use_batches: bool = False
batch_size: int = 10
compute_distance: Callable = None
distance_params: dict = None
generate_plot: bool = False
@dataclass
class DatasetAttackScoreMembershipKnnProbabilities(DatasetAttackScore):
"""DatasetAttackMembershipKnnProbabilities privacy risk score.
"""
roc_auc_score: float
average_precision_score: float
assessment_type: str = 'MembershipKnnProbabilities' # to be used in reports
def __init__(self, dataset_name: str, roc_auc_score: float, average_precision_score: float,
result: DatasetAttackResultMembership) -> None:
"""
dataset_name: dataset name to be used in reports
roc_auc_score: the area under the receiver operating characteristic curve (AUC ROC) to evaluate the attack
performance.
average_precision_score: the proportion of predicted members that are correctly members
result: the result of the membership inference attack
"""
super().__init__(dataset_name=dataset_name, risk_score=roc_auc_score, result=result)
self.roc_auc_score = roc_auc_score
self.average_precision_score = average_precision_score
class DatasetAttackMembershipKnnProbabilities(DatasetAttackMembership):
"""
Privacy risk assessment for synthetic datasets based on Black-Box MIA attack using distances of
members (training set) and non-members (holdout set) from their nearest neighbors in the synthetic dataset.
By default, the Euclidean distance is used (L2 norm), but another ``compute_distance()`` method can be provided
in configuration instead.
The area under the receiver operating characteristic curve (AUC ROC) gives the privacy risk measure.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigMembershipKnnProbabilities = DatasetAttackConfigMembershipKnnProbabilities(),
dataset_name: str = DEFAULT_DATASET_NAME):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the attack, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
attack_strategy_utils)
if config.compute_distance:
self.knn_learner = NearestNeighbors(n_neighbors=config.k, algorithm='auto', metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.knn_learner = NearestNeighbors(n_neighbors=config.k, algorithm='auto')
def assess_privacy(self) -> DatasetAttackScoreMembershipKnnProbabilities:
"""
Membership Inference Attack which calculates probabilities of member and non-member samples to be generated by
the synthetic data generator.
The assumption is that since the generative model is trained to approximate the training data distribution
then the probability of a sample to be a member of the training data should be proportional to the probability
that the query sample can be generated by the generative model.
So, if the probability that the query sample is generated by the generative model is large,
it is more likely that the query sample was used to train the generative model. This probability is approximated
by the Parzen window density estimation in ``probability_per_sample()``, computed from the NN distances from the
query samples to the synthetic data samples.
:return:
Privacy score of the attack together with the attack result with the probabilities of member and
non-member samples to be generated by the synthetic data generator based on the NN distances from the
query samples to the synthetic data samples
"""
# nearest neighbor search
self.attack_strategy_utils.fit(self.knn_learner, self.synthetic_data)
# members query
member_proba = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_members,
self.probability_per_sample)
# non-members query
non_member_proba = self.attack_strategy_utils.find_knn(self.knn_learner, self.original_data_non_members,
self.probability_per_sample)
result = DatasetAttackResultMembership(member_probabilities=member_proba,
non_member_probabilities=non_member_proba)
score = self.calculate_privacy_score(result, self.config.generate_plot)
return score
def calculate_privacy_score(self, dataset_attack_result: DatasetAttackResultMembership,
generate_plot: bool = False) -> DatasetAttackScoreMembershipKnnProbabilities:
"""
Evaluate privacy score from the probabilities of member and non-member samples to be generated by the synthetic
data generator. The probabilities are computed by the ``assess_privacy()`` method.
:param dataset_attack_result attack result containing probabilities of member and non-member samples to be
generated by the synthetic data generator
:param generate_plot generate AUC ROC curve plot and persist it
:return:
score of the attack, based on distance-based probabilities - mainly the ROC AUC score
"""
member_proba, non_member_proba = \
dataset_attack_result.member_probabilities, dataset_attack_result.non_member_probabilities
fpr, tpr, threshold, auc, ap = self.calculate_metrics(member_proba, non_member_proba)
score = DatasetAttackScoreMembershipKnnProbabilities(self.dataset_name,
result=dataset_attack_result,
roc_auc_score=auc, average_precision_score=ap)
if generate_plot:
self.plot_roc_curve(self.dataset_name, member_proba, non_member_proba)
return score
@staticmethod
def probability_per_sample(distances: np.ndarray):
"""
For every sample represented by its distance from the query sample to its KNN in synthetic data,
computes the probability of the synthetic data to be part of the query dataset.
:param distances: distance between every query sample in batch to its KNNs among synthetic samples, a numpy
array of size (n, k) with n being the number of samples, k - the number of KNNs
:return:
probability estimates of the query samples being generated and so - of being part of the synthetic set, a
numpy array of size (n,)
"""
return np.average(np.exp(-distances), axis=1)

View file

@ -0,0 +1,24 @@
from dataclasses import dataclass
from typing import Optional
import numpy as np
DEFAULT_DATASET_NAME = "dataset"
@dataclass
class DatasetAttackResult:
pass
@dataclass
class DatasetAttackScore:
dataset_name: str
risk_score: float
result: Optional[DatasetAttackResult]
@dataclass
class DatasetAttackResultMembership(DatasetAttackResult):
member_probabilities: np.ndarray
non_member_probabilities: np.ndarray

View file

@ -0,0 +1,127 @@
"""
This module implements privacy risk assessment of synthetic datasets based on the papers
"Data Synthesis based on Generative Adversarial Networks." by N. Park, M. Mohammadi, K. Gorde, S. Jajodia, H. Park,
and Y. Kim in International Conference on Very Large Data Bases (VLDB), 2018.
and "Holdout-Based Fidelity and Privacy Assessment of Mixed-Type Synthetic Data" by M. Platzer and T. Reutterer.
and on a variation of its reference implementation in https://github.com/mostly-ai/paper-fidelity-accuracy.
"""
from dataclasses import dataclass
import numpy as np
from sklearn.neighbors import NearestNeighbors
from apt.risk.data_assessment.attack_strategy_utils import KNNAttackStrategyUtils
from apt.risk.data_assessment.dataset_attack import Config, DatasetAttack
from apt.risk.data_assessment.dataset_attack_result import DatasetAttackScore, DEFAULT_DATASET_NAME
from apt.utils.datasets import ArrayDataset
K = 1 # Number of nearest neighbors to search. For DCR we need only the nearest neighbor.
@dataclass
class DatasetAttackConfigWholeDatasetKnnDistance(Config):
"""Configuration for DatasetAttackWholeDatasetKnnDistance.
Attributes:
use_batches: Divide query samples into batches or not.
batch_size: Query sample batch size.
compute_distance: A callable function, which takes two arrays representing 1D vectors as inputs and must return
one value indicating the distance between those vectors.
See 'metric' parameter in sklearn.neighbors.NearestNeighbors documentation.
distance_params: Additional keyword arguments for the distance computation function, see 'metric_params' in
sklearn.neighbors.NearestNeighbors documentation.
"""
use_batches: bool = False
batch_size: int = 10
compute_distance: callable = None
distance_params: dict = None
@dataclass
class DatasetAttackScoreWholeDatasetKnnDistance(DatasetAttackScore):
"""DatasetAttackWholeDatasetKnnDistance privacy risk score.
"""
share: float
assessment_type: str = 'WholeDatasetKnnDistance' # to be used in reports
def __init__(self, dataset_name: str, share: float) -> None:
"""
dataset_name: dataset name to be used in reports
share : the share of synthetic records closer to the training than the holdout dataset.
A value of 0.5 or close to it means good privacy.
"""
super().__init__(dataset_name=dataset_name, risk_score=share, result=None)
self.share = share
class DatasetAttackWholeDatasetKnnDistance(DatasetAttack):
"""
Privacy risk assessment for synthetic datasets based on distances of synthetic data records from
members (training set) and non-members (holdout set). The privacy risk measure is the share of synthetic
records closer to the training than the holdout dataset.
By default, the Euclidean distance is used (L2 norm), but another compute_distance() method can be provided in
configuration instead.
"""
def __init__(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset,
config: DatasetAttackConfigWholeDatasetKnnDistance = DatasetAttackConfigWholeDatasetKnnDistance(),
dataset_name: str = DEFAULT_DATASET_NAME):
"""
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param config: Configuration parameters to guide the assessment process, optional
:param dataset_name: A name to identify this dataset, optional
"""
attack_strategy_utils = KNNAttackStrategyUtils(config.use_batches, config.batch_size)
super().__init__(original_data_members, original_data_non_members, synthetic_data, config, dataset_name,
attack_strategy_utils)
if config.compute_distance:
self.knn_learner_members = NearestNeighbors(n_neighbors=K, metric=config.compute_distance,
metric_params=config.distance_params)
self.knn_learner_non_members = NearestNeighbors(n_neighbors=K, metric=config.compute_distance,
metric_params=config.distance_params)
else:
self.knn_learner_members = NearestNeighbors(n_neighbors=K)
self.knn_learner_non_members = NearestNeighbors(n_neighbors=K)
def assess_privacy(self) -> DatasetAttackScoreWholeDatasetKnnDistance:
"""
Calculate the share of synthetic records closer to the training than the holdout dataset, based on the
DCR computed by 'calculate_distances()'.
:return:
score of the attack, based on the NN distances from the query samples to the synthetic data samples
"""
member_distances, non_member_distances = self.calculate_distances()
# distance of the synth. records to members and to non-members
assert (len(member_distances) == len(non_member_distances))
n_members = len(self.original_data_members.get_samples())
n_non_members = len(self.original_data_non_members.get_samples())
# percent of synth. records closer to members,
# and distance ties are divided equally between members and non-members
share = np.mean(member_distances < non_member_distances) + (n_members / (n_members + n_non_members)) * np.mean(
member_distances == non_member_distances)
score = DatasetAttackScoreWholeDatasetKnnDistance(self.dataset_name, share=share)
return score
def calculate_distances(self):
"""
Calculate member and non-member query probabilities, based on their distance to their KNN among
synthetic samples. This distance is called distance to the closest record (DCR), as defined by
N. Park et. al. in "Data Synthesis based on Generative Adversarial Networks."
:return:
member_distances - distances of each synthetic data member from its nearest training sample
non_member_distances - distances of each synthetic data member from its nearest validation sample
"""
# nearest neighbor search
self.attack_strategy_utils.fit(self.knn_learner_members, self.original_data_members)
self.attack_strategy_utils.fit(self.knn_learner_non_members, self.original_data_non_members)
# distances of the synthetic data from the member and non-member samples
member_distances = self.attack_strategy_utils.find_knn(self.knn_learner_members, self.synthetic_data)
non_member_distances = self.attack_strategy_utils.find_knn(self.knn_learner_non_members, self.synthetic_data)
return member_distances, non_member_distances

View file

@ -3,6 +3,8 @@ pandas~=1.1.0
scipy==1.4.1 scipy==1.4.1
scikit-learn>=0.22.2 scikit-learn>=0.22.2
torch>=1.8.0 torch>=1.8.0
tqdm>=4.64.1
matplotlib>=3.7.0
adversarial-robustness-toolbox>=1.11.0 adversarial-robustness-toolbox>=1.11.0
# testing # testing

View file

@ -0,0 +1,173 @@
import numpy as np
import pytest
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KernelDensity
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_diabetes_dataset_np, get_adult_dataset_pd, \
get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
MIN_SHARE = 0.5
MIN_ROC_AUC = 0.0
MIN_PRECISION = 0.0
NUM_SYNTH_SAMPLES = 40000
NUM_SYNTH_COMPONENTS = 4
iris_dataset_np = get_iris_dataset_np()
diabetes_dataset_np = get_diabetes_dataset_np()
nursery_dataset_pd = get_nursery_dataset_pd()
adult_dataset_pd = get_adult_dataset_pd()
mgr = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=False, generate_plots=False))
def teardown_function():
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('diabetes_np', diabetes_dataset_np, 'np', k, mgr) for k in range(2, 10, 4)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)] \
+ [('adult_pd', adult_dataset_pd, 'pd', k, mgr) for k in range(2, 10, 4)]
@pytest.mark.parametrize("name, data, dataset_type, k, mgr", anon_testdata)
def test_risk_anonymization(name, data, dataset_type, k, mgr):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
# no need to preprocess
preprocessed_x_train = x_train
preprocessed_x_test = x_test
QI = [0, 2]
anonymizer = Anonymize(k, QI, train_only_QI=True)
elif "adult" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_adult_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI)
elif "nursery" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(k, QI, train_only_QI=True)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
anonymized_data = ArrayDataset(anonymizer.anonymize(ArrayDataset(preprocessed_x_train, y_train)))
original_data_members = ArrayDataset(preprocessed_x_train, y_train)
original_data_non_members = ArrayDataset(preprocessed_x_test, y_test)
dataset_name = f'anon_k{k}_{name}'
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, anonymized_data,
dataset_name)
testdata = [('iris_np', iris_dataset_np, 'np', mgr),
('diabetes_np', diabetes_dataset_np, 'np', mgr),
('nursery_pd', nursery_dataset_pd, 'pd', mgr),
('adult_pd', adult_dataset_pd, 'pd', mgr)]
@pytest.mark.parametrize("name, data, dataset_type, mgr", testdata)
def test_risk_kde(name, data, dataset_type, mgr):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
encoded = x_train
encoded_test = x_test
num_synth_components = NUM_SYNTH_COMPONENTS
elif "adult" in name:
encoded, encoded_test = preprocess_adult_x_data(x_train, x_test)
num_synth_components = 10
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
else:
raise ValueError('Pandas dataset missing a preprocessing step')
synth_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name)
def kde(n_samples, n_components, original_data):
"""
Simple synthetic data genrator: estimates the kernel density of data using a Gaussian kernel and then generates
samples from this distribution
"""
digit_data = original_data
pca = PCA(n_components=n_components, whiten=False)
data = pca.fit_transform(digit_data)
params = {'bandwidth': np.logspace(-1, 1, 20)}
grid = GridSearchCV(KernelDensity(), params, cv=5)
grid.fit(data)
kde_estimator = grid.best_estimator_
new_data = kde_estimator.sample(n_samples, random_state=0)
new_data = pca.inverse_transform(new_data)
return new_data
def preprocess_adult_x_data(x_train, x_test):
features = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex',
'native-country']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test
def preprocess_nursery_x_data(x_train, x_test):
x_train = x_train.astype(str)
features = ["parents", "has_nurs", "form", "children", "housing", "finance", "social", "health"]
# QI = ["finance", "social", "health"]
categorical_features = ["parents", "has_nurs", "form", "housing", "finance", "social", "health", 'children']
# prepare data for DT
numeric_features = [f for f in features if f not in categorical_features]
numeric_transformer = Pipeline(
steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))]
)
categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, numeric_features),
("cat", categorical_transformer, categorical_features),
]
)
encoded = preprocessor.fit_transform(x_train)
encoded_test = preprocessor.fit_transform(x_test)
return encoded, encoded_test
def assess_privacy_and_validate_result(dataset_assessment_manager, original_data_members, original_data_non_members,
synth_data, dataset_name):
[score_g, score_h] = dataset_assessment_manager.assess(original_data_members, original_data_non_members, synth_data,
dataset_name)
assert (score_g.roc_auc_score > MIN_ROC_AUC)
assert (score_g.average_precision_score > MIN_PRECISION)
assert (score_h.share > MIN_SHARE)

View file

@ -0,0 +1,109 @@
import pytest
from apt.anonymization import Anonymize
from apt.risk.data_assessment.dataset_assessment_manager import DatasetAssessmentManager, DatasetAssessmentManagerConfig
from apt.utils.dataset_utils import get_iris_dataset_np, get_nursery_dataset_pd
from apt.utils.datasets import ArrayDataset
from tests.test_data_assessment import kde, preprocess_nursery_x_data
NUM_SYNTH_SAMPLES = 10
NUM_SYNTH_COMPONENTS = 2
ANON_K = 2
MIN_SHARE = 0.5
MIN_ROC_AUC = 0.0
MIN_PRECISION = 0.0
iris_dataset_np = get_iris_dataset_np()
nursery_dataset_pd = get_nursery_dataset_pd()
mgr1 = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=False, generate_plots=False))
mgr2 = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=False, generate_plots=True))
mgr3 = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=True, generate_plots=False))
mgr4 = DatasetAssessmentManager(DatasetAssessmentManagerConfig(persist_reports=True, generate_plots=True))
mgrs = [mgr1, mgr2, mgr3, mgr4]
def teardown_function():
for mgr in mgrs:
mgr.dump_all_scores_to_files()
anon_testdata = [('iris_np', iris_dataset_np, 'np', mgr1)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr2)] \
+ [('iris_np', iris_dataset_np, 'np', mgr3)] \
+ [('nursery_pd', nursery_dataset_pd, 'pd', mgr4)]
@pytest.mark.parametrize("name, data, dataset_type, mgr", anon_testdata)
def test_risk_anonymization(name, data, dataset_type, mgr):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
# no need to preprocess
preprocessed_x_train = x_train
preprocessed_x_test = x_test
QI = [0, 2]
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
elif "nursery" in name:
preprocessed_x_train, preprocessed_x_test = preprocess_nursery_x_data(x_train, x_test)
QI = list(range(15, 27))
anonymizer = Anonymize(ANON_K, QI, train_only_QI=True)
else:
raise ValueError('Pandas dataset missing a preprocessing step')
anonymized_data = ArrayDataset(anonymizer.anonymize(ArrayDataset(preprocessed_x_train, y_train)))
original_data_members = ArrayDataset(preprocessed_x_train, y_train)
original_data_non_members = ArrayDataset(preprocessed_x_test, y_test)
dataset_name = f'anon_k{ANON_K}_{name}'
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, anonymized_data,
dataset_name)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=anonymized_data, dataset_name=None)
testdata = [('iris_np', iris_dataset_np, 'np', mgr4),
('nursery_pd', nursery_dataset_pd, 'pd', mgr3),
('iris_np', iris_dataset_np, 'np', mgr2),
('nursery_pd', nursery_dataset_pd, 'pd', mgr1)]
@pytest.mark.parametrize("name, data, dataset_type, mgr", testdata)
def test_risk_kde(name, data, dataset_type, mgr):
(x_train, y_train), (x_test, y_test) = data
if dataset_type == 'np':
encoded = x_train
encoded_test = x_test
num_synth_components = NUM_SYNTH_COMPONENTS
elif "nursery" in name:
encoded, encoded_test = preprocess_nursery_x_data(x_train, x_test)
num_synth_components = 10
else:
raise ValueError('Pandas dataset missing a preprocessing step')
synth_data = ArrayDataset(
kde(NUM_SYNTH_SAMPLES, n_components=num_synth_components, original_data=encoded))
original_data_members = ArrayDataset(encoded, y_train)
original_data_non_members = ArrayDataset(encoded_test, y_test)
dataset_name = 'kde' + str(NUM_SYNTH_SAMPLES) + name
assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data, dataset_name)
assess_privacy_and_validate_result(mgr, original_data_members=original_data_members,
original_data_non_members=original_data_non_members,
synth_data=synth_data, dataset_name=None)
def assess_privacy_and_validate_result(mgr, original_data_members, original_data_non_members, synth_data,
dataset_name):
if dataset_name:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data,
dataset_name)
else:
[score_g, score_h] = mgr.assess(original_data_members, original_data_non_members, synth_data)
assert (score_g.roc_auc_score > MIN_ROC_AUC)
assert (score_g.average_precision_score > MIN_PRECISION)
assert (score_h.share > MIN_SHARE)