generalize_using_transform is now passed separately to the fit and calculate_ncp methods.

ncp is now a class that contains 3 values: fit_score, transform_score and generalizations_score so that it doesn't matter in what order the different methods are called, all calculated ncp scores are stored.
Generalizations can now be applied either from tree cells or from global generalizations struct depending on the value of generalize_using_transform. Representative values can also be computed from global generalizations.
Removing a feature from the generalization can also be applied in either mode.

Signed-off-by: abigailt <abigailt@il.ibm.com>
This commit is contained in:
abigailt 2023-08-07 15:31:49 +03:00
parent 7d22423ac3
commit b48b829a01
2 changed files with 278 additions and 139 deletions

View file

@ -2,6 +2,8 @@
This module implements all classes needed to perform data minimization
"""
from typing import Union, Optional
from dataclasses import dataclass
from collections import Counter
import pandas as pd
import numpy as np
import copy
@ -20,6 +22,13 @@ from apt.utils.datasets import ArrayDataset, DATA_PANDAS_NUMPY_TYPE
from apt.utils.models import Model, SklearnRegressor, ModelOutputType, SklearnClassifier
@dataclass
class NCPScores:
fit_score: float = None
transform_score: float = None
generalizations_score: float = None
class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerMixin):
"""
A transformer that generalizes data to representative points.
@ -41,11 +50,6 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
:param target_accuracy: The required relative accuracy when applying the base model to the generalized data.
Accuracy is measured relative to the original accuracy of the model.
:type target_accuracy: float, optional
:param generalize_using_transform: Indicates how to calculate NCP and accuracy during the generalization process.
True means that the `transform` method is used to transform original data into
generalized data that is used for accuracy and NCP calculation. False indicates
that the `generalizations` structure should be used. Default is True.
:type generalize_using_transform: boolean, optional
:param cells: The cells used to generalize records. Each cell must define a range or subset of categories for
each feature, as well as a representative value for each feature. This parameter should be used
when instantiating a transformer object without first fitting it.
@ -68,7 +72,6 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
def __init__(self, estimator: Union[BaseEstimator, Model] = None,
target_accuracy: Optional[float] = 0.998,
generalize_using_transform: Optional[bool] = True,
cells: Optional[list] = None,
categorical_features: Optional[Union[np.ndarray, list]] = None,
encoder: Optional[Union[OrdinalEncoder, OneHotEncoder]] = None,
@ -93,8 +96,8 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
self.train_only_features_to_minimize = train_only_features_to_minimize
self.is_regression = is_regression
self.encoder = encoder
self.generalize_using_transform = generalize_using_transform
self._ncp = 0.0
# self._ncp = 0.0
self._ncp_scores = NCPScores()
self._feature_data = None
self._categorical_values = {}
self._dt = None
@ -168,12 +171,13 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
@property
def ncp(self):
"""
Return the last calculated NCP score. NCP score is calculated upon calling `fit` (on the training data),
Return the last calculated NCP scores. NCP score is calculated upon calling `fit` (on the training data),
`transform' (on the test data) or when explicitly calling `calculate_ncp` and providing it a dataset.
:return: NCP score as float.
:return: NCPScores object, that contains a score corresponding to the last fit call, one for the last
transform call, and a score based on global generalizations.
"""
return self._ncp
return self._ncp_scores
def fit_transform(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, y: Optional[DATA_PANDAS_NUMPY_TYPE] = None,
features_names: Optional[list] = None, dataset: Optional[ArrayDataset] = None):
@ -198,7 +202,7 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
return self.transform(X, features_names, dataset=dataset)
def fit(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, y: Optional[DATA_PANDAS_NUMPY_TYPE] = None,
features_names: Optional = None, dataset: ArrayDataset = None):
features_names: Optional = None, dataset: ArrayDataset = None, generalize_using_transform: bool = True):
"""Learns the generalizations based on training data.
:param X: The training input samples.
@ -211,6 +215,11 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
:param dataset: Data wrapper containing the training input samples and the predictions of the original model
on the training data. Either ``X``, ``y`` OR ``dataset`` need to be provided, not both.
:type dataset: `ArrayDataset`, optional
:param generalize_using_transform: Indicates how to calculate NCP and accuracy during the generalization process.
True means that the `transform` method is used to transform original data into
generalized data that is used for accuracy and NCP calculation. False indicates
that the `generalizations` structure should be used. Default is True.
:type generalize_using_transform: boolean, optional
:return: self
"""
@ -317,7 +326,10 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
# self._cells currently holds the generalization created from the tree leaves
self._calculate_generalizations()
generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells, self._cells_by_id)
if generalize_using_transform:
generalized = self._generalize_from_tree(X_test, x_prepared_test, nodes, self.cells, self._cells_by_id)
else:
generalized = self._generalize_from_generalizations(X_test, self.generalizations)
# check accuracy
accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test))
@ -344,8 +356,12 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
self._attach_cells_representatives(x_prepared, used_X_train, y_train, nodes)
self._calculate_generalizations()
generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells,
self._cells_by_id)
if generalize_using_transform:
generalized = self._generalize_from_tree(X_test, x_prepared_test, nodes, self.cells,
self._cells_by_id)
else:
generalized = self._generalize_from_generalizations(X_test, self.generalizations)
accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test))
# if accuracy passed threshold roll back to previous iteration generalizations
if accuracy < self.target_accuracy:
@ -364,12 +380,17 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
while accuracy < self.target_accuracy:
removed_feature = self._remove_feature_from_generalization(X_test, x_prepared_test,
nodes, y_test,
self._feature_data, accuracy)
self._feature_data, accuracy,
generalize_using_transform)
if removed_feature is None:
break
self._calculate_generalizations()
generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells, self._cells_by_id)
if generalize_using_transform:
generalized = self._generalize_from_tree(X_test, x_prepared_test, nodes, self.cells,
self._cells_by_id)
else:
generalized = self._generalize_from_generalizations(X_test, self.generalizations)
accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test))
print('Removed feature: %s, new relative accuracy: %f' % (removed_feature, accuracy))
@ -377,7 +398,8 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
# calculate iLoss
X_test_dataset = ArrayDataset(X_test, features_names=self._features)
self.calculate_ncp(X_test_dataset)
self._ncp_scores.fit_score = self.calculate_ncp(X_test_dataset, generalize_using_transform)
self._ncp_scores.generalizations_score = self.calculate_ncp(X_test_dataset, False)
# Return the transformer
return self
@ -399,14 +421,11 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
pandas DataFrame (depending on the type of ``X``), shape (n_samples, n_features)
"""
transformed = self._inner_transform(X, features_names, dataset)
if not self.generalize_using_transform:
raise ValueError('transform method called even though generalize_using_transform parameter was False. This '
'can lead to inconsistent results.')
transformed_dataset = ArrayDataset(transformed, features_names=self._features)
self.calculate_ncp(transformed_dataset)
self._ncp_scores.transform_score = self.calculate_ncp(transformed_dataset, True)
return transformed
def calculate_ncp(self, samples: Optional[ArrayDataset] = None):
def calculate_ncp(self, samples: ArrayDataset, generalize_using_transform: bool = True):
"""
Compute the NCP score of the generalization. Calculation is based on the value of the
generalize_using_transform param. If samples are provided, updates stored ncp value to the one computed on the
@ -415,15 +434,15 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
Based on the NCP score presented in: Ghinita, G., Karras, P., Kalnis, P., Mamoulis, N.: Fast data anonymization
with low information loss (https://www.vldb.org/conf/2007/papers/research/p758-ghinita.pdf)
:param samples: The input samples to compute the NCP score on. Ideally should be the data that will be
transformed (e.g., test/runtime data). If not samples supplied, will return the last NCP score
computed by the `fit` or `transform` method.
:param samples: The input samples to compute the NCP score on.
:type samples: ArrayDataset, optional. feature_names should be set.
:param generalize_using_transform: Indicates how to calculate NCP and accuracy during the generalization process.
True means that the `transform` method is used to transform original data into
generalized data that is used for accuracy and NCP calculation. False indicates
that the `generalizations` structure should be used. Default is True.
:type generalize_using_transform: boolean, optional
:return: NCP score as float.
"""
if samples is None:
return self._ncp
if not samples.features_names:
raise ValueError('features_names should be set in input ArrayDataset.')
samples_pd = pd.DataFrame(samples.get_samples(), columns=samples.features_names)
@ -433,7 +452,7 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
self._feature_data = self._get_feature_data(samples_pd)
total_samples = samples_pd.shape[0]
if self.generalize_using_transform:
if generalize_using_transform:
generalizations = self._calculate_cell_generalizations()
# count how many records are mapped to each cell
counted = np.zeros(samples_pd.shape[0]) # to mark records we already counted
@ -449,14 +468,13 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
category_counts[feature] = [count]
ncp += self._calc_ncp_for_generalization(generalizations[cell['id']], range_counts, category_counts,
total_samples)
self._ncp = ncp
else: # use generalizations
generalizations = self.generalizations
range_counts = self._find_range_counts(samples_pd, generalizations['ranges'])
category_counts = self._find_categories_counts(samples_pd, generalizations['categories'])
self._ncp = self._calc_ncp_for_generalization(generalizations, range_counts, category_counts, total_samples)
ncp = self._calc_ncp_for_generalization(generalizations, range_counts, category_counts, total_samples)
return self._ncp
return ncp
def _inner_transform(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, features_names: Optional[list] = None,
dataset: Optional[ArrayDataset] = None):
@ -493,7 +511,7 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
if self.train_only_features_to_minimize:
used_x = QI
prepared = self._encode_categorical_features(used_x)
generalized = self._generalize(x, prepared, nodes, self.cells, self._cells_by_id)
generalized = self._generalize_from_tree(x, prepared, nodes, self.cells, self._cells_by_id)
else:
mapped = np.zeros(x.shape[0]) # to mark records we already mapped
all_indexes = []
@ -574,9 +592,11 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
def _get_record_count_for_cell(self, X, cell, mapped):
count = 0
for index, row in X.iterrows():
index = 0
for _, row in X.iterrows():
if not mapped.item(index) and self._cell_contains(cell, row, index, mapped):
count += 1
index += 1
return count
def _cell_contains(self, cell, x, index, mapped):
@ -836,7 +856,30 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
nodeSet = set(nodes)
return [(list(set([i for i, v in enumerate(p) if v == 1]) & nodeSet))[0] for p in paths]
def _generalize(self, original_data, prepared_data, level_nodes, cells, cells_by_id):
# method for applying generalizations (for global generalization-based acuuracy) without dt
def _generalize_from_generalizations(self, original_data, generalizations):
sample_indexes = self._map_to_ranges_categories(original_data,
generalizations['ranges'],
generalizations['categories'])
original_data_generalized = pd.DataFrame(original_data, columns=self._features, copy=True)
for feature in self._generalizations['categories']:
if ('untouched' not in generalizations or feature not in generalizations['untouched']):
for g_index, group in enumerate(generalizations['categories'][feature]):
indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == g_index]
if indexes:
rows = original_data_generalized.iloc[indexes]
rows[feature] = generalizations['category_representatives'][feature][g_index]
for feature in self._generalizations['ranges']:
if ('untouched' not in generalizations or feature not in generalizations['untouched']):
for r_index, range in enumerate(generalizations['ranges'][feature]):
indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == r_index]
if indexes:
rows = original_data_generalized.iloc[indexes]
rows[feature] = generalizations['range_representatives'][feature][r_index]
return original_data_generalized
def _generalize_from_tree(self, original_data, prepared_data, level_nodes, cells, cells_by_id):
mapping_to_cells = self._map_to_cells(prepared_data, level_nodes, cells_by_id)
all_indexes = []
for i in range(len(cells)):
@ -879,6 +922,39 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
return original_data_generalized
@staticmethod
def _map_to_ranges_categories(samples, ranges, categories):
all_sample_indexes = []
for _, row in samples.iterrows():
sample_indexes = {}
for feature in ranges:
if not ranges[feature]:
# no values means whole range
sample_indexes[feature] = 0
else:
for index, value in enumerate(ranges[feature]):
if row[feature] <= value:
sample_indexes[feature] = index
break
sample_indexes[feature] = index + 1
for feature in categories:
for g_index, group in enumerate(categories[feature]):
if row[feature] in group:
sample_indexes[feature] = g_index
break
# found = False
# for g_index, group in enumerate(categories):
# sample_indexes[feature] = {}
# for c_index, category in enumerate(group):
# if row[feature] == group:
# sample_indexes[feature][g_index] = c_index
# found = True
# break
# if found:
# break
all_sample_indexes.append(sample_indexes)
return all_sample_indexes
def _map_to_cells(self, samples, nodes, cells_by_id):
mapping_to_cells = {}
for index, row in samples.iterrows():
@ -891,17 +967,18 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
return [cells_by_id[nodeId] for nodeId in node_ids]
def _remove_feature_from_generalization(self, original_data, prepared_data, nodes, labels, feature_data,
current_accuracy):
current_accuracy, generalize_using_transform):
# prepared data include one hot encoded categorical data,
# if there is no categorical data prepared data is original data
feature = self._get_feature_to_remove(original_data, prepared_data, nodes, labels, feature_data,
current_accuracy)
current_accuracy, generalize_using_transform)
if feature is None:
return None
self._remove_feature_from_cells(self.cells, self._cells_by_id, feature)
return feature
def _get_feature_to_remove(self, original_data, prepared_data, nodes, labels, feature_data, current_accuracy):
def _get_feature_to_remove(self, original_data, prepared_data, nodes, labels, feature_data, current_accuracy,
generalize_using_transform):
# prepared data include one hot encoded categorical data,
# if there is no categorical data prepared data is original data
# We want to remove features with low iLoss (NCP) and high accuracy gain
@ -916,16 +993,20 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
for feature in ranges.keys():
if feature not in self._generalizations['untouched']:
feature_ncp = self._calc_ncp_numeric(ranges[feature],
range_counts[feature],
feature_data[feature],
total)
if generalize_using_transform:
feature_ncp = self._calculate_ncp_for_feature_from_cells(feature, feature_data, original_data)
else:
feature_ncp = self._calc_ncp_numeric(ranges[feature],
range_counts[feature],
feature_data[feature],
total)
if feature_ncp > 0:
# divide by accuracy gain
new_cells = copy.deepcopy(self.cells)
cells_by_id = copy.deepcopy(self._cells_by_id)
GeneralizeToRepresentative._remove_feature_from_cells(new_cells, cells_by_id, feature)
generalized = self._generalize(original_data, prepared_data, nodes, new_cells, cells_by_id)
generalized = self._generalize_from_tree(original_data, prepared_data, nodes, new_cells,
cells_by_id)
accuracy_gain = self.estimator.score(ArrayDataset(self.encoder.transform(generalized),
labels)) - current_accuracy
if accuracy_gain < 0:
@ -939,16 +1020,20 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
for feature in categories.keys():
if feature not in self.generalizations['untouched']:
feature_ncp = self._calc_ncp_categorical(categories[feature],
category_counts[feature],
feature_data[feature],
total)
if generalize_using_transform:
feature_ncp = self._calculate_ncp_for_feature_from_cells(feature, feature_data, original_data)
else:
feature_ncp = self._calc_ncp_categorical(categories[feature],
category_counts[feature],
feature_data[feature],
total)
if feature_ncp > 0:
# divide by accuracy loss
new_cells = copy.deepcopy(self.cells)
cells_by_id = copy.deepcopy(self._cells_by_id)
GeneralizeToRepresentative._remove_feature_from_cells(new_cells, cells_by_id, feature)
generalized = self._generalize(original_data, prepared_data, nodes, new_cells, cells_by_id)
generalized = self._generalize_from_tree(original_data, prepared_data, nodes, new_cells,
cells_by_id)
accuracy_gain = self.estimator.score(ArrayDataset(self.encoder.transform(generalized),
labels)) - current_accuracy
@ -963,16 +1048,89 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
print('feature to remove: ' + (str(remove_feature) if remove_feature is not None else 'none'))
return remove_feature
def _calculate_generalizations(self):
self._generalizations = {'ranges': self._calculate_ranges(self.cells),
'categories': self._calculate_categories(self.cells),
def _calculate_ncp_for_feature_from_cells(self, feature, feature_data, samples_pd):
# count how many records are mapped to each cell
counted = np.zeros(samples_pd.shape[0]) # to mark records we already counted
total = samples_pd.shape[0]
feature_ncp = 0
for i in range(len(self.cells)):
cell = self.cells[i]
count = self._get_record_count_for_cell(samples_pd, cell, counted)
generalizations = self._calculate_generalizations_for_cell(cell)
cell_ncp = 0
if feature in cell['ranges']:
cell_ncp = self._calc_ncp_numeric(generalizations['ranges'][feature],
[count],
feature_data[feature],
total)
elif feature in cell['categories']:
cell_ncp = self._calc_ncp_categorical(generalizations['categories'][feature],
[count],
feature_data[feature],
total)
feature_ncp += cell_ncp
return feature_ncp
def _calculate_generalizations(self, samples: Optional[pd.DataFrame] = None):
ranges, range_representatives = self._calculate_ranges(self.cells)
categories, category_representatives = self._calculate_categories(self.cells)
self._generalizations = {'ranges': ranges,
'categories': categories,
'untouched': self._calculate_untouched(self.cells)}
self._remove_categorical_untouched(self._generalizations)
# compute representative value for each feature (based on data)
if samples is not None:
sample_indexes = self._map_to_ranges_categories(samples,
self._generalizations['ranges'],
self._generalizations['categories'])
# categorical - use most common value
category_representatives = {}
for feature in self._generalizations['categories']:
category_representatives[feature] = []
for g_index, group in enumerate(self._generalizations['categories'][feature]):
# max_count = 0
# for c_index in range(len(group)):
# indexes = [i for i, s in enumerate(sample_indexes) if s[feature][g_index] == c_index]
indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == g_index]
rows = samples[indexes]
values = rows[:, feature]
category = Counter(values).most_common(1)[0][0]
category_representatives[feature].append(group[category])
# c_count = len([s for s in sample_indexes if s[feature][g_index] == c_index])
# if c_count > max_count:
# max_count = c_count
# category = c_index
# category_representatives[feature].append(group[category])
def _calculate_generalizations_per_cell(self, cell):
generalizations = {'ranges': self._calculate_ranges([cell]),
'categories': self._calculate_categories([cell]),
'untouched': self._calculate_untouched([cell])}
# numerical - use actual value closest to mean
range_representatives = {}
for feature in self._generalizations['ranges']:
# find the mean value (per feature)
for index in range(len(self._generalizations['ranges'][feature])):
indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == index]
rows = samples[indexes]
values = rows[:, feature]
median = np.median(values)
min_value = max(values)
min_dist = float("inf")
for value in values:
# euclidean distance between two floating point values
dist = abs(value - median)
if dist < min_dist:
min_dist = dist
min_value = value
range_representatives[feature].append(min_value)
self._generalizations['category_representatives'] = category_representatives
self._generalizations['range_representatives'] = range_representatives
def _calculate_generalizations_for_cell(self, cell):
ranges, range_representatives = self._calculate_ranges([cell])
categories, category_representatives = self._calculate_categories([cell])
generalizations = {'ranges': ranges,
'categories': categories,
'untouched': self._calculate_untouched([cell]),
'range_representatives': range_representatives,
'category_representatives': category_representatives}
self._remove_categorical_untouched(generalizations)
return generalizations
@ -980,7 +1138,7 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
# calculate generalizations separately per cell
cell_generalizations = {}
for cell in self.cells:
cell_generalizations[cell['id']] = self._calculate_generalizations_per_cell(cell)
cell_generalizations[cell['id']] = self._calculate_generalizations_for_cell(cell)
return cell_generalizations
@staticmethod
@ -1013,6 +1171,7 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
@staticmethod
def _calculate_ranges(cells):
ranges = {}
range_representatives = {}
for cell in cells:
for feature in [key for key in cell['ranges'].keys() if
'untouched' not in cell or key not in cell['untouched']]:
@ -1022,17 +1181,36 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
ranges[feature].append(cell['ranges'][feature]['start'])
if cell['ranges'][feature]['end'] is not None:
ranges[feature].append(cell['ranges'][feature]['end'])
# default representative values (computed with no data)
for feature in ranges.keys():
ranges[feature] = list(set(ranges[feature]))
ranges[feature].sort()
return ranges
range_representatives[feature] = []
if not ranges[feature]:
# no values means the complete range. Without data we cannot know what to put here.
# Using 0 as a placeholder.
range_representatives[feature].append(0)
else:
ranges[feature] = list(set(ranges[feature]))
ranges[feature].sort()
for index, value in enumerate(ranges[feature]):
if index == 0:
# for first range, use min value
range_representatives[feature].append(value)
else:
# use middle of range (this will be a float)
range_representatives[feature].append((value - prev_value) / 2)
prev_value = value
# for last range use max value + 1
range_representatives[feature].append(prev_value + 1)
return ranges, range_representatives
@staticmethod
def _calculate_categories(cells):
categories = {}
category_representatives = {}
categorical_features_values = GeneralizeToRepresentative._calculate_categorical_features_values(cells)
for feature in categorical_features_values.keys():
partitions = []
category_representatives[feature] = []
values = categorical_features_values[feature]
assigned = []
for i in range(len(values)):
@ -1049,8 +1227,10 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM
partition.append(value2)
assigned.append(value2)
partitions.append(partition)
# default representative values (computed with no data)
category_representatives[feature].append(partition[0]) # random
categories[feature] = partitions
return categories
return categories, category_representatives
@staticmethod
def _calculate_categorical_features_values(cells):

View file

@ -76,40 +76,10 @@ def test_minimizer_params_not_transform():
model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES)
model.fit(ArrayDataset(X, y))
gen = GeneralizeToRepresentative(model, cells=cells, generalize_using_transform=False)
gen.calculate_ncp(samples)
ncp = gen.ncp
gen = GeneralizeToRepresentative(model, cells=cells)
ncp = gen.calculate_ncp(samples, generalize_using_transform=False)
assert (ncp > 0.0)
def test_minimizer_params_not_transform_no_data():
# Assume two features, age and height, and boolean label
cells = [{"id": 1, "ranges": {"age": {"start": None, "end": 38}, "height": {"start": None, "end": 170}}, "label": 0,
'categories': {}, "representative": {"age": 26, "height": 149}},
{"id": 2, "ranges": {"age": {"start": 39, "end": None}, "height": {"start": None, "end": 170}}, "label": 1,
'categories': {}, "representative": {"age": 58, "height": 163}},
{"id": 3, "ranges": {"age": {"start": None, "end": 38}, "height": {"start": 171, "end": None}}, "label": 0,
'categories': {}, "representative": {"age": 31, "height": 184}},
{"id": 4, "ranges": {"age": {"start": 39, "end": None}, "height": {"start": 171, "end": None}}, "label": 1,
'categories': {}, "representative": {"age": 45, "height": 176}}
]
features = ['age', 'height']
X = np.array([[23, 165],
[45, 158],
[18, 190]])
y = [1, 1, 0]
samples = ArrayDataset(X, y, features)
base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2,
min_samples_leaf=1)
model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES)
model.fit(ArrayDataset(X, y))
gen = GeneralizeToRepresentative(model, cells=cells, generalize_using_transform=False)
gen.calculate_ncp(samples)
ncp = gen.ncp
assert (ncp > 0.0)
def test_minimizer_fit():
features = ['age', 'height']
X = np.array([[23, 165],
@ -155,7 +125,7 @@ def test_minimizer_fit():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (X[indexes])).any())
@ -197,21 +167,20 @@ def test_minimizer_ncp():
target_accuracy = 0.4
train_dataset = ArrayDataset(X, predictions, features_names=features)
gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, generalize_using_transform=False)
gen1.fit(dataset=train_dataset)
ncp1 = gen1.ncp
gen1.calculate_ncp(ad1)
ncp2 = gen1.ncp
gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy)
gen1.fit(dataset=train_dataset, generalize_using_transform=False)
ncp1 = gen1.ncp.fit_score
ncp2 = gen1.calculate_ncp(ad1, generalize_using_transform=False)
gen2 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy)
gen2.fit(dataset=train_dataset)
ncp3 = gen2.ncp
ncp3 = gen2.ncp.fit_score
gen2.transform(dataset=ad1)
ncp4 = gen2.ncp
ncp4 = gen2.ncp.transform_score
gen2.transform(dataset=ad)
ncp5 = gen2.ncp
ncp5 = gen2.ncp.transform_score
gen2.transform(dataset=ad1)
ncp6 = gen2.ncp
ncp6 = gen2.ncp.transform_score
assert (ncp1 <= ncp3)
assert (ncp2 != ncp3)
@ -272,22 +241,21 @@ def test_minimizer_ncp_categorical():
target_accuracy = 0.4
train_dataset = ArrayDataset(X, predictions, features_names=features)
gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, generalize_using_transform=False,
gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy,
categorical_features=categorical_features)
gen1.fit(dataset=train_dataset)
ncp1 = gen1.ncp
gen1.calculate_ncp(ad1)
ncp2 = gen1.ncp
gen1.fit(dataset=train_dataset, generalize_using_transform=False)
ncp1 = gen1.ncp.fit_score
ncp2 = gen1.calculate_ncp(ad1, generalize_using_transform=False)
gen2 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, categorical_features=categorical_features)
gen2.fit(dataset=train_dataset)
ncp3 = gen2.ncp
ncp3 = gen2.ncp.fit_score
gen2.transform(dataset=ad1)
ncp4 = gen2.ncp
ncp4 = gen2.ncp.transform_score
gen2.transform(dataset=ad)
ncp5 = gen2.ncp
ncp5 = gen2.ncp.transform_score
gen2.transform(dataset=ad1)
ncp6 = gen2.ncp
ncp6 = gen2.ncp.transform_score
assert (ncp1 <= ncp3)
assert (ncp2 != ncp3)
@ -319,12 +287,12 @@ def test_minimizer_fit_not_transform():
if predictions.shape[1] > 1:
predictions = np.argmax(predictions, axis=1)
target_accuracy = 0.5
gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, generalize_using_transform=False)
gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy)
train_dataset = ArrayDataset(X, predictions, features_names=features)
gen.fit(dataset=train_dataset)
gen.fit(dataset=train_dataset, generalize_using_transform=False)
gener = gen.generalizations
expected_generalizations = {'ranges': {}, 'categories': {}, 'untouched': ['height', 'age']}
expected_generalizations = {'ranges': {'age': [], 'height':[157.0]}, 'categories': {}, 'untouched': []}
for key in expected_generalizations['ranges']:
assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key]))
@ -340,7 +308,7 @@ def test_minimizer_fit_not_transform():
if features[i] in modified_features:
indexes.append(i)
ncp = gen.ncp
ncp = gen.ncp.fit_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
@ -394,8 +362,8 @@ def test_minimizer_fit_pandas():
gen.fit(dataset=train_dataset)
transformed = gen.transform(dataset=ArrayDataset(X))
gener = gen.generalizations
expected_generalizations = {'ranges': {'age': []}, 'categories': {'sex': [['f', 'm']], 'ola': [['aa', 'bb']]},
'untouched': ['height']}
expected_generalizations = {'ranges': {'age': []}, 'categories': {},
'untouched': ['height', 'sex', 'ola']}
for key in expected_generalizations['ranges']:
assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key]))
@ -407,7 +375,7 @@ def test_minimizer_fit_pandas():
f in expected_generalizations['categories'].keys() or f in expected_generalizations[
'ranges'].keys()]
np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1))
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[modified_features]).equals(X[modified_features])) is False)
@ -529,7 +497,7 @@ def test_minimizer_fit_QI():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (X[indexes])).any())
@ -605,7 +573,7 @@ def test_minimizer_fit_pandas_QI():
'ranges'].keys()]
# assert (transformed.drop(modified_features, axis=1).equals(X.drop(modified_features, axis=1)))
np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1))
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[modified_features]).equals(X[modified_features])) is False)
@ -649,7 +617,7 @@ def test_minimize_ndarray_iris():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (x_train[indexes])).any())
@ -727,7 +695,7 @@ def test_minimize_pandas_adult():
'ranges'].keys()]
# assert (transformed.drop(modified_features, axis=1).equals(x_train.drop(modified_features, axis=1)))
np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), x_train.drop(modified_features, axis=1))
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[modified_features]).equals(x_train[modified_features])) is False)
@ -805,7 +773,7 @@ def test_german_credit_pandas():
'ranges'].keys()]
# assert (transformed.drop(modified_features, axis=1).equals(x_train.drop(modified_features, axis=1)))
np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), x_train.drop(modified_features, axis=1))
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[modified_features]).equals(x_train[modified_features])) is False)
@ -878,7 +846,7 @@ def test_regression(dataset):
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (x_train[indexes])).any())
@ -932,7 +900,7 @@ def test_X_y():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (X[indexes])).any())
@ -986,7 +954,7 @@ def test_X_y_features_names():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (X[indexes])).any())
@ -1060,7 +1028,7 @@ def test_BaseEstimator_classification():
'ranges'].keys()]
# assert (transformed.drop(modified_features, axis=1).equals(X.drop(modified_features, axis=1)))
np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1))
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[modified_features]).equals(X[modified_features])) is False)
@ -1132,7 +1100,7 @@ def test_BaseEstimator_regression(dataset):
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (x_train[indexes])).any())
@ -1173,7 +1141,7 @@ def test_keras_model():
if features[i] in modified_features:
indexes.append(i)
assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_test, indexes, axis=1)).all())
ncp = gen.ncp
ncp = gen.ncp.transform_score
if len(gener['ranges'].keys()) > 0 or len(gener['categories'].keys()) > 0:
assert (ncp > 0.0)
assert (((transformed[indexes]) != (X[indexes])).any())
@ -1226,12 +1194,3 @@ def test_errors():
predictions = model.predict(ad)
if predictions.shape[1] > 1:
predictions = np.argmax(predictions, axis=1)
gen = GeneralizeToRepresentative(model, generalize_using_transform=False)
train_dataset = ArrayDataset(X, predictions, features_names=features)
gen.fit(dataset=train_dataset)
with pytest.raises(ValueError):
gen.transform(X)
with pytest.raises(ValueError):
gen.calculate_ncp(ad)