ai-privacy-toolkit/apt/risk/data_assessment/attack_strategy_utils.py
2023-09-20 00:28:18 +03:00

172 lines
8.8 KiB
Python

import abc
from dataclasses import dataclass
import numpy as np
from scipy import stats
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
from pandas.api.types import is_numeric_dtype, is_categorical_dtype
from apt.utils.datasets import ArrayDataset
class AttackStrategyUtils(abc.ABC):
"""
Abstract base class for common utilities of various privacy attack strategies.
"""
pass
@dataclass
class DistributionValidationResult:
"""Holds the result of the validation of distributions similarities.
Attributes:
distributions_valid (bool): False if there are columns whose distribution is different between the datasets
member_column_distribution_diff (list): Columns whose distribution is different between the member and the
synthetic datasets
non_member_column_distribution_diff (list): Columns whose distribution is different between the non-member and
the synthetic datasets
"""
distributions_valid: bool
member_column_distribution_diff: list
non_member_column_distribution_diff: list
class KNNAttackStrategyUtils(AttackStrategyUtils):
"""
Common utilities for attack strategy based on KNN distances.
"""
def __init__(self, use_batches: bool = False, batch_size: int = 10) -> None:
"""
:param use_batches: Use batches with a progress meter or not when finding KNNs for query set
:param batch_size: if use_batches=True, the size of batch_size should be > 0
"""
self.use_batches = use_batches
self.batch_size = batch_size
if use_batches:
if batch_size < 1:
raise ValueError(f"When using batching batch_size should be > 0, and not {batch_size}")
def fit(self, knn_learner: NearestNeighbors, dataset: ArrayDataset):
knn_learner.fit(dataset.get_samples())
def find_knn(self, knn_learner: NearestNeighbors, query_samples: ArrayDataset, distance_processor=None):
"""
Nearest neighbor search function.
:param query_samples: query samples, to which nearest neighbors are to be found
:param knn_learner: unsupervised learner for implementing neighbor searches, after it was fitted
:param distance_processor: function for processing the distance into another more relevant metric per sample.
Its input is an array representing distances (the distances returned by NearestNeighbors.kneighbors() ), and
the output should be another array with distance-based values that enable to compute the final risk score
:return:
distances of the query samples to their nearest neighbors, or a metric based on that distance and calculated
by the distance_processor function
"""
samples = query_samples.get_samples()
if not self.use_batches:
distances, _ = knn_learner.kneighbors(samples, return_distance=True)
if distance_processor:
return distance_processor(distances)
else:
return distances
distances = []
for i in tqdm(range(len(samples) // self.batch_size)):
x_batch = samples[i * self.batch_size:(i + 1) * self.batch_size]
x_batch = np.reshape(x_batch, [self.batch_size, -1])
# dist_batch: distance between every query sample in batch to its KNNs among training samples
dist_batch, _ = knn_learner.kneighbors(x_batch, return_distance=True)
# The probability of each sample to be generated
if distance_processor:
distance_based_metric_per_sample_batch = distance_processor(dist_batch)
distances.append(distance_based_metric_per_sample_batch)
else:
distances.append(dist_batch)
return np.concatenate(distances)
@staticmethod
def _column_statistical_test(df1_column_samples, df2_column_samples, column, is_categorical, is_numeric, test_type,
alpha, differing_columns):
if is_categorical(column):
try:
result = stats.chisquare(f_obs=df1_column_samples, f_exp=df1_column_samples)
except ValueError as e:
if str(e).startswith('For each axis slice, the sum of'):
print('Column', column, e)
else:
raise
elif is_numeric:
if test_type == 'KS':
result = stats.ks_2samp(df1_column_samples, df2_column_samples)
elif test_type == 'CVM':
result = stats.cramervonmises_2samp(df1_column_samples, df1_column_samples)
else:
raise ValueError('Unknown test type', test_type)
else:
print(f'Skipping non-numeric and non-categorical column {column}')
return
print(
f"{column}: {test_type} = {result.statistic:.4f} "
f"(p-value = {result.pvalue:.3e}, are equal = {result.pvalue > 0.05})")
if result.pvalue < alpha:
# Reject H0, different distributions
print(f"Distributions differ in column {column}, p-value: {result.pvalue}")
differing_columns.append(column)
else:
# Accept H0, similar distributions
print(f'Accept H0, similar distributions in column {column}')
@staticmethod
def _columns_different_distributions(df1: ArrayDataset, df2: ArrayDataset,
categorical_features: list = [],
alpha=0.05, test_type='KS') -> list:
differing_columns = []
df1_samples = df1.get_samples()
df2_samples = df2.get_samples()
if df1.is_pandas:
def is_categorical(col_name):
col_name in categorical_features or is_categorical_dtype(df1_samples.dtypes[col_name])
def is_numeric(col_name): is_numeric_dtype(df1_samples.dtypes[col_name])
for name, _ in df1_samples.items():
KNNAttackStrategyUtils._column_statistical_test(df1_samples[name], df2_samples[name], name,
is_categorical, is_numeric(df1_samples.dtypes[name]),
test_type, alpha, differing_columns)
else:
is_df1_numeric_dtype = np.issubdtype(df1_samples.dtype, int) or np.issubdtype(df1_samples.dtype, float)
def is_categorical(col_name): col_name in categorical_features
for i, column in enumerate(df1_samples.T):
KNNAttackStrategyUtils._column_statistical_test(df1_samples[:, i], df2_samples[:, i], i,
is_categorical, is_df1_numeric_dtype, test_type, alpha,
differing_columns)
return differing_columns
def validate_distributions(self, original_data_members: ArrayDataset, original_data_non_members: ArrayDataset,
synthetic_data: ArrayDataset, categorical_features: list = None):
"""
Validate column distributions are similar between the datasets.
:param original_data_members: A container for the training original samples and labels
:param original_data_non_members: A container for the holdout original samples and labels
:param synthetic_data: A container for the synthetic samples and labels
:param categorical_features: a list of categorical features of the datasets
:return:
DistributionValidationResult
"""
member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_members,
categorical_features)
non_member_column_distribution_diff = self._columns_different_distributions(synthetic_data,
original_data_non_members,
categorical_features)
if not member_column_distribution_diff and not non_member_column_distribution_diff:
return DistributionValidationResult(distributions_valid=True,
member_column_distribution_diff=[],
non_member_column_distribution_diff=[])
return DistributionValidationResult(distributions_valid=False,
member_column_distribution_diff=member_column_distribution_diff,
non_member_column_distribution_diff=non_member_column_distribution_diff)