diff --git a/apt/anonymization/anonymizer.py b/apt/anonymization/anonymizer.py index f830bb6..cd7f097 100644 --- a/apt/anonymization/anonymizer.py +++ b/apt/anonymization/anonymizer.py @@ -1,6 +1,5 @@ import numpy as np import pandas as pd -from scipy.spatial import distance from collections import Counter from sklearn.compose import ColumnTransformer @@ -146,7 +145,8 @@ class Anonymize: min_value = max(values) min_dist = float("inf") for value in values: - dist = distance.euclidean(value, median) + # euclidean distance between two floating point values + dist = abs(value - median) if dist < min_dist: min_dist = dist min_value = value diff --git a/apt/minimization/minimizer.py b/apt/minimization/minimizer.py index 65513b8..5993397 100644 --- a/apt/minimization/minimizer.py +++ b/apt/minimization/minimizer.py @@ -2,6 +2,8 @@ This module implements all classes needed to perform data minimization """ from typing import Union, Optional +from dataclasses import dataclass +from collections import Counter import pandas as pd import numpy as np import copy @@ -20,6 +22,13 @@ from apt.utils.datasets import ArrayDataset, DATA_PANDAS_NUMPY_TYPE from apt.utils.models import Model, SklearnRegressor, ModelOutputType, SklearnClassifier +@dataclass +class NCPScores: + fit_score: float = None + transform_score: float = None + generalizations_score: float = None + + class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerMixin): """ A transformer that generalizes data to representative points. @@ -59,14 +68,23 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM :param is_regression: Whether the model is a regression model or not (if False, assumes a classification model). Default is False. :type is_regression: boolean, optional + :param generalize_using_transform: Indicates how to calculate NCP and accuracy during the generalization + process. True means that the `transform` method is used to transform original + data into generalized data that is used for accuracy and NCP calculation. + False indicates that the `generalizations` structure should be used. + Default is True. + :type generalize_using_transform: boolean, optional """ - def __init__(self, estimator: Union[BaseEstimator, Model] = None, target_accuracy: Optional[float] = 0.998, - cells: Optional[list] = None, categorical_features: Optional[Union[np.ndarray, list]] = None, + def __init__(self, estimator: Union[BaseEstimator, Model] = None, + target_accuracy: Optional[float] = 0.998, + cells: Optional[list] = None, + categorical_features: Optional[Union[np.ndarray, list]] = None, encoder: Optional[Union[OrdinalEncoder, OneHotEncoder]] = None, features_to_minimize: Optional[Union[np.ndarray, list]] = None, train_only_features_to_minimize: Optional[bool] = True, - is_regression: Optional[bool] = False): + is_regression: Optional[bool] = False, + generalize_using_transform: bool = True): self.estimator = estimator if estimator is not None and not issubclass(estimator.__class__, Model): @@ -76,6 +94,8 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM self.estimator = SklearnClassifier(estimator, ModelOutputType.CLASSIFIER_PROBABILITIES) self.target_accuracy = target_accuracy self.cells = cells + if cells: + self._calculate_generalizations() self.categorical_features = [] if categorical_features: self.categorical_features = categorical_features @@ -83,6 +103,13 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM self.train_only_features_to_minimize = train_only_features_to_minimize self.is_regression = is_regression self.encoder = encoder + self.generalize_using_transform = generalize_using_transform + self._ncp_scores = NCPScores() + self._feature_data = None + self._categorical_values = {} + self._dt = None + self._features = None + self._level = 0 def get_params(self, deep=True): """ @@ -99,12 +126,13 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM ret['features_to_minimize'] = self.features_to_minimize ret['train_only_features_to_minimize'] = self.train_only_features_to_minimize ret['is_regression'] = self.is_regression + ret['estimator'] = self.estimator + ret['encoder'] = self.encoder if deep: ret['cells'] = copy.deepcopy(self.cells) - ret['estimator'] = self.estimator - ret['encoder'] = self.encoder else: ret['cells'] = copy.copy(self.cells) + return ret def set_params(self, **params): @@ -132,6 +160,10 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM self.is_regression = params['is_regression'] if 'cells' in params: self.cells = params['cells'] + if 'estimator' in params: + self.estimator = params['estimator'] + if 'encoder' in params: + self.encoder = params['encoder'] return self @property @@ -140,24 +172,27 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM Return the generalizations derived from the model and test data. :return: generalizations object. Contains 3 sections: 'ranges' that contains ranges for numerical features, - 'categories' that contains sub-groups of categories for categorical features, and - 'untouched' that contains the features that could not be generalized. + 'categories' that contains sub-groups of categories for categorical features, and + 'untouched' that contains the features that could not be generalized. """ return self._generalizations @property def ncp(self): """ - Return the NCP score of the generalizations. + Return the last calculated NCP scores. NCP score is calculated upon calling `fit` (on the training data), + `transform' (on the test data) or when explicitly calling `calculate_ncp` and providing it a dataset. - :return: ncp score as float. + :return: NCPScores object, that contains a score corresponding to the last fit call, one for the last + transform call, and a score based on global generalizations. """ - return self._ncp + return self._ncp_scores def fit_transform(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, y: Optional[DATA_PANDAS_NUMPY_TYPE] = None, features_names: Optional[list] = None, dataset: Optional[ArrayDataset] = None): """ - Learns the generalizations based on training data, and applies them to the data. + Learns the generalizations based on training data, and applies them to the data. Also sets the fit_score, + transform_score and generalizations_score in self.ncp. :param X: The training input samples. :type X: {array-like, sparse matrix}, shape (n_samples, n_features), optional @@ -172,19 +207,23 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM :return: Array containing the representative values to which each record in ``X`` is mapped, as numpy array or pandas DataFrame (depending on the type of ``X``), shape (n_samples, n_features) """ + if not self.generalize_using_transform: + raise ValueError('fit_transform method called even though generalize_using_transform parameter was False. ' + 'This can lead to inconsistent results.') self.fit(X, y, features_names, dataset=dataset) return self.transform(X, features_names, dataset=dataset) def fit(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, y: Optional[DATA_PANDAS_NUMPY_TYPE] = None, features_names: Optional = None, dataset: ArrayDataset = None): - """Learns the generalizations based on training data. + """Learns the generalizations based on training data. Also sets the fit_score and generalizations_score in + self.ncp. :param X: The training input samples. :type X: {array-like, sparse matrix}, shape (n_samples, n_features), optional :param y: The target values. This should contain the predictions of the original model on ``X``. :type y: array-like, shape (n_samples,), optional - :param features_names: The feature names, in the order that they appear in the data. Can be provided when - passing the data as ``X`` and ``y`` + :param features_names: The feature names, in the order that they appear in the data. Should be provided when + passing the data as ``X`` as a numpy array :type features_names: list of strings, optional :param dataset: Data wrapper containing the training input samples and the predictions of the original model on the training data. Either ``X``, ``y`` OR ``dataset`` need to be provided, not both. @@ -223,46 +262,35 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM self.features_to_minimize = [str(i) for i in self.features_to_minimize] if not all(elem in self._features for elem in self.features_to_minimize): raise ValueError('features to minimize should be a subset of features names') - x_QI = x.loc[:, self.features_to_minimize] + x_qi = x.loc[:, self.features_to_minimize] # divide dataset into train and test used_data = x if self.train_only_features_to_minimize: - used_data = x_QI + used_data = x_qi if self.is_regression: - X_train, X_test, y_train, y_test = train_test_split(x, dataset.get_labels(), test_size=0.4, + x_train, x_test, y_train, y_test = train_test_split(x, dataset.get_labels(), test_size=0.4, random_state=14) else: try: - X_train, X_test, y_train, y_test = train_test_split(x, dataset.get_labels(), + x_train, x_test, y_train, y_test = train_test_split(x, dataset.get_labels(), stratify=dataset.get_labels(), test_size=0.4, random_state=18) except ValueError: print('Could not stratify split due to uncommon class value, doing unstratified split instead') - X_train, X_test, y_train, y_test = train_test_split(x, dataset.get_labels(), test_size=0.4, + x_train, x_test, y_train, y_test = train_test_split(x, dataset.get_labels(), test_size=0.4, random_state=18) - X_train_QI = X_train.loc[:, self.features_to_minimize] - X_test_QI = X_test.loc[:, self.features_to_minimize] - used_X_train = X_train - used_X_test = X_test + x_train_qi = x_train.loc[:, self.features_to_minimize] + x_test_qi = x_test.loc[:, self.features_to_minimize] + used_x_train = x_train + used_x_test = x_test if self.train_only_features_to_minimize: - used_X_train = X_train_QI - used_X_test = X_test_QI + used_x_train = x_train_qi + used_x_test = x_test_qi # collect feature data (such as min, max) - feature_data = {} - for feature in self._features: - if feature not in feature_data.keys(): - fd = {} - values = list(x.loc[:, feature]) - if feature not in self.categorical_features: - fd['min'] = min(values) - fd['max'] = max(values) - fd['range'] = max(values) - min(values) - else: - fd['range'] = len(np.unique(values)) - feature_data[feature] = fd + self._feature_data = self._get_feature_data(x) # default encoder in case none provided if self.encoder is None: @@ -290,9 +318,9 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM # prepare data for DT self._encode_categorical_features(used_data, save_mapping=True) - x_prepared = self._encode_categorical_features(used_X_train) + x_prepared = self._encode_categorical_features(used_x_train) self._dt.fit(x_prepared, y_train) - x_prepared_test = self._encode_categorical_features(used_X_test) + x_prepared_test = self._encode_categorical_features(used_x_test) self._calculate_cells() self._modify_cells() @@ -302,11 +330,14 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM self._remove_feature_from_cells(self.cells, self._cells_by_id, feature) nodes = self._get_nodes_level(0) - self._attach_cells_representatives(x_prepared, used_X_train, y_train, nodes) + self._attach_cells_representatives(x_prepared, used_x_train, y_train, nodes) # self._cells currently holds the generalization created from the tree leaves - self._calculate_generalizations() - generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells, self._cells_by_id) + self._calculate_generalizations(x_test) + if self.generalize_using_transform: + generalized = self._generalize_from_tree(x_test, x_prepared_test, nodes, self.cells, self._cells_by_id) + else: + generalized = self._generalize_from_generalizations(x_test, self.generalizations) # check accuracy accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test)) @@ -316,66 +347,79 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM # if accuracy above threshold, improve generalization if accuracy > self.target_accuracy: print('Improving generalizations') - level = 1 + self._level = 1 while accuracy > self.target_accuracy: cells_previous_iter = self.cells generalization_prev_iter = self._generalizations cells_by_id_prev = self._cells_by_id - nodes = self._get_nodes_level(level) + nodes = self._get_nodes_level(self._level) try: - self._calculate_level_cells(level) + self._calculate_level_cells(self._level) except TypeError as e: print(e) + self._level -= 1 break - self._attach_cells_representatives(x_prepared, used_X_train, y_train, nodes) + self._attach_cells_representatives(x_prepared, used_x_train, y_train, nodes) + + self._calculate_generalizations(x_test) + if self.generalize_using_transform: + generalized = self._generalize_from_tree(x_test, x_prepared_test, nodes, self.cells, + self._cells_by_id) + else: + generalized = self._generalize_from_generalizations(x_test, self.generalizations) - self._calculate_generalizations() - generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells, - self._cells_by_id) accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test)) # if accuracy passed threshold roll back to previous iteration generalizations if accuracy < self.target_accuracy: self.cells = cells_previous_iter self._generalizations = generalization_prev_iter self._cells_by_id = cells_by_id_prev + self._level -= 1 break else: - print('Pruned tree to level: %d, new relative accuracy: %f' % (level, accuracy)) - level += 1 + print('Pruned tree to level: %d, new relative accuracy: %f' % (self._level, accuracy)) + self._level += 1 # if accuracy below threshold, improve accuracy by removing features from generalization elif accuracy < self.target_accuracy: print('Improving accuracy') while accuracy < self.target_accuracy: - removed_feature = self._remove_feature_from_generalization(X_test, x_prepared_test, + removed_feature = self._remove_feature_from_generalization(x_test, x_prepared_test, nodes, y_test, - feature_data, accuracy) + self._feature_data, accuracy, + self.generalize_using_transform) if removed_feature is None: break - self._calculate_generalizations() - generalized = self._generalize(X_test, x_prepared_test, nodes, self.cells, self._cells_by_id) + self._calculate_generalizations(x_test) + if self.generalize_using_transform: + generalized = self._generalize_from_tree(x_test, x_prepared_test, nodes, self.cells, + self._cells_by_id) + else: + generalized = self._generalize_from_generalizations(x_test, self.generalizations) accuracy = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), y_test)) print('Removed feature: %s, new relative accuracy: %f' % (removed_feature, accuracy)) # self._cells currently holds the chosen generalization based on target accuracy # calculate iLoss - self._ncp = self._calculate_ncp(X_test, self._generalizations, feature_data) + x_test_dataset = ArrayDataset(x_test, features_names=self._features) + self._ncp_scores.fit_score = self.calculate_ncp(x_test_dataset) + self._ncp_scores.generalizations_score = self.calculate_ncp(x_test_dataset) # Return the transformer return self def transform(self, X: Optional[DATA_PANDAS_NUMPY_TYPE] = None, features_names: Optional[list] = None, dataset: Optional[ArrayDataset] = None): - """ Transforms data records to representative points. + """ Transforms data records to representative points. Also sets the transform_score in self.ncp. :param X: The training input samples. :type X: {array-like, sparse matrix}, shape (n_samples, n_features), optional - :param features_names: The feature names, in the order that they appear in the data. Can be provided when - passing the data as ``X`` and ``y`` + :param features_names: The feature names, in the order that they appear in the data. Should be provided when + passing the data as ``X`` as a numpy array :type features_names: list of strings, optional :param dataset: Data wrapper containing the training input samples and the predictions of the original model on the training data. Either ``X`` OR ``dataset`` need to be provided, not both. @@ -383,69 +427,197 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM :return: Array containing the representative values to which each record in ``X`` is mapped, as numpy array or pandas DataFrame (depending on the type of ``X``), shape (n_samples, n_features) """ + if not self.generalize_using_transform: + raise ValueError('transform method called even though generalize_using_transform parameter was False. This ' + 'can lead to inconsistent results.') + transformed = self._inner_transform(X, features_names, dataset) + transformed_dataset = ArrayDataset(transformed, features_names=self._features) + self._ncp_scores.transform_score = self.calculate_ncp(transformed_dataset) + return transformed + def calculate_ncp(self, samples: ArrayDataset): + """ + Compute the NCP score of the generalization. Calculation is based on the value of the + generalize_using_transform param. If samples are provided, updates stored ncp value to the one computed on the + provided data. If samples not provided, returns the last NCP score computed by the `fit` or `transform` method. + + Based on the NCP score presented in: Ghinita, G., Karras, P., Kalnis, P., Mamoulis, N.: Fast data anonymization + with low information loss (https://www.vldb.org/conf/2007/papers/research/p758-ghinita.pdf) + + :param samples: The input samples to compute the NCP score on. + :type samples: ArrayDataset, optional. feature_names should be set. + :return: NCP score as float. + """ + if not samples.features_names: + raise ValueError('features_names should be set in input ArrayDataset.') + samples_pd = pd.DataFrame(samples.get_samples(), columns=samples.features_names) + if self._features is None: + self._features = samples.features_names + if self._feature_data is None: + self._feature_data = self._get_feature_data(samples_pd) + total_samples = samples_pd.shape[0] + + if self.generalize_using_transform: + generalizations = self._calculate_cell_generalizations() + # count how many records are mapped to each cell + counted = np.zeros(samples_pd.shape[0]) # to mark records we already counted + ncp = 0 + for cell in self.cells: + count = self._get_record_count_for_cell(samples_pd, cell, counted) + range_counts = {} + category_counts = {} + for feature in cell['ranges']: + range_counts[feature] = [count] + for feature in cell['categories']: + category_counts[feature] = [count] + ncp += self._calc_ncp_for_generalization(generalizations[cell['id']], range_counts, category_counts, + total_samples) + else: # use generalizations + generalizations = self.generalizations + range_counts = self._find_range_counts(samples_pd, generalizations['ranges']) + category_counts = self._find_category_counts(samples_pd, generalizations['categories']) + ncp = self._calc_ncp_for_generalization(generalizations, range_counts, category_counts, total_samples) + + return ncp + + def _inner_transform(self, x: Optional[DATA_PANDAS_NUMPY_TYPE] = None, features_names: Optional[list] = None, + dataset: Optional[ArrayDataset] = None): # Check if fit has been called msg = 'This %(name)s instance is not initialized yet. ' \ 'Call ‘fit’ or ‘set_params’ with ' \ 'appropriate arguments before using this method.' check_is_fitted(self, ['cells'], msg=msg) - if X is not None: + if x is not None: if dataset is not None: - raise ValueError('Either X OR dataset need to be provided, not both') + raise ValueError('Either x OR dataset need to be provided, not both') else: - dataset = ArrayDataset(X, features_names=features_names) + dataset = ArrayDataset(x, features_names=features_names) elif dataset is None: - raise ValueError('Either X OR dataset need to be provided, not both') + raise ValueError('Either x OR dataset need to be provided, not both') if dataset and dataset.features_names: - self._features = dataset.features_names + if self._features is None: + self._features = dataset.features_names if dataset and dataset.get_samples() is not None: - x = pd.DataFrame(dataset.get_samples(), columns=self._features) + x_pd = pd.DataFrame(dataset.get_samples(), columns=self._features) - if x.shape[1] != self._n_features and self._n_features != 0: + if x_pd.shape[1] != self._n_features and self._n_features != 0: raise ValueError('Shape of input is different from what was seen' 'in `fit`') if not self._features: - self._features = [i for i in range(x.shape[1])] + self._features = [i for i in range(x_pd.shape[1])] - mapped = np.zeros(x.shape[0]) # to mark records we already mapped - all_indexes = [] - for i in range(len(self.cells)): - indexes = self._get_record_indexes_for_cell(x, self.cells[i], mapped) - all_indexes.append(indexes) - generalized = self._generalize_indexes(x, self.cells, all_indexes) + if self._dt: # only works if fit was called previously (but much more efficient) + nodes = self._get_nodes_level(self._level) + QI = x_pd.loc[:, self.features_to_minimize] + used_x = x_pd + if self.train_only_features_to_minimize: + used_x = QI + prepared = self._encode_categorical_features(used_x) + generalized = self._generalize_from_tree(x_pd, prepared, nodes, self.cells, self._cells_by_id) + else: + mapped = np.zeros(x_pd.shape[0]) # to mark records we already mapped + all_indexes = [] + for cell in self.cells: + indexes = self._get_record_indexes_for_cell(x_pd, cell, mapped) + all_indexes.append(indexes) + generalized = self._generalize_indexes(x_pd, self.cells, all_indexes) if dataset and dataset.is_pandas: return generalized - elif isinstance(X, pd.DataFrame): + elif isinstance(x, pd.DataFrame): return generalized return generalized.to_numpy() - def _get_record_indexes_for_cell(self, X, cell, mapped): + def _calc_ncp_for_generalization(self, generalization, range_counts, category_counts, total_count): + total_ncp = 0 + total_features = len(generalization['untouched']) + ranges = generalization['ranges'] + categories = generalization['categories'] + + # suppressed features are already taken care of within _calc_ncp_numeric + for feature in ranges.keys(): + feature_ncp = self._calc_ncp_numeric(ranges[feature], range_counts[feature], + self._feature_data[feature], total_count) + total_ncp = total_ncp + feature_ncp + total_features += 1 + for feature in categories.keys(): + feature_ncp = self._calc_ncp_categorical(categories[feature], category_counts[feature], + self._feature_data[feature], + total_count) + total_ncp = total_ncp + feature_ncp + total_features += 1 + if total_features == 0: + return 0 + return total_ncp / total_features + + @staticmethod + def _calc_ncp_categorical(categories, category_count, feature_data, total): + category_sizes = [len(g) if len(g) > 1 else 0 for g in categories] + normalized_category_sizes = [s * n / total for s, n in zip(category_sizes, category_count)] + average_group_size = sum(normalized_category_sizes) / len(normalized_category_sizes) + return average_group_size / feature_data['range'] # number of values in category + + @staticmethod + def _calc_ncp_numeric(range, range_count, feature_data, total): + # if there are no ranges, feature is suppressed and iLoss is 1 + if not range: + return 1 + # range only contains the split values, need to add min and max value of feature + # to enable computing sizes of all ranges + new_range = [feature_data['min']] + range + [feature_data['max']] + range_sizes = [b - a for a, b in zip(new_range[::1], new_range[1::1])] + normalized_range_sizes = [s * n / total for s, n in zip(range_sizes, range_count)] + average_range_size = sum(normalized_range_sizes) / len(normalized_range_sizes) + return average_range_size / (feature_data['max'] - feature_data['min']) + + def _get_feature_data(self, x): + feature_data = {} + for feature in self._features: + if feature not in feature_data.keys(): + fd = {} + values = list(x.loc[:, feature]) + if feature not in self.categorical_features: + fd['min'] = min(values) + fd['max'] = max(values) + fd['range'] = max(values) - min(values) + else: + fd['range'] = len(np.unique(values)) + feature_data[feature] = fd + return feature_data + + def _get_record_indexes_for_cell(self, x, cell, mapped): indexes = [] - for index, row in X.iterrows(): + for index, row in x.iterrows(): if not mapped.item(index) and self._cell_contains(cell, row, index, mapped): indexes.append(index) return indexes - def _cell_contains(self, cell, x, i, mapped): - for f in self._features: - if f in cell['ranges']: - if not self._cell_contains_numeric(f, cell['ranges'][f], x): + def _get_record_count_for_cell(self, x, cell, mapped): + count = 0 + for index, (_, row) in enumerate(x.iterrows()): + if not mapped.item(index) and self._cell_contains(cell, row, index, mapped): + count += 1 + return count + + def _cell_contains(self, cell, row, index, mapped): + for i, feature in enumerate(self._features): + if feature in cell['ranges']: + if not self._cell_contains_numeric(i, cell['ranges'][feature], row): return False - elif f in cell['categories']: - if not self._cell_contains_categorical(f, cell['categories'][f], x): + elif feature in cell['categories']: + if not self._cell_contains_categorical(i, cell['categories'][feature], row): return False - elif f in cell['untouched']: + elif feature in cell['untouched']: continue else: - raise TypeError("feature " + f + "not found in cell" + cell['id']) + raise TypeError("feature " + feature + "not found in cell" + cell['id']) # Mark as mapped - mapped.itemset(i, 1) + mapped.itemset(index, 1) return True - def _encode_categorical_features(self, X, save_mapping=False): + def _encode_categorical_features(self, x, save_mapping=False): if save_mapping: self._categorical_values = {} self._one_hot_vector_features_to_features = {} @@ -456,31 +628,31 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM for feature in self.categorical_features: if feature in used_features: try: - all_values = X.loc[:, feature] + all_values = x.loc[:, feature] values = list(all_values.unique()) if save_mapping: self._categorical_values[feature] = values - X[feature] = pd.Categorical(X.loc[:, feature], categories=self._categorical_values[feature], + x[feature] = pd.Categorical(x.loc[:, feature], categories=self._categorical_values[feature], ordered=False) - ohe = pd.get_dummies(X[feature], prefix=feature) + ohe = pd.get_dummies(x[feature], prefix=feature) if save_mapping: for one_hot_vector_feature in ohe.columns: self._one_hot_vector_features_to_features[one_hot_vector_feature] = feature - X = pd.concat([X, ohe], axis=1) + x = pd.concat([x, ohe], axis=1) features_to_remove.append(feature) except KeyError: print("feature " + feature + "not found in training data") - new_data = X.drop(features_to_remove, axis=1) + new_data = x.drop(features_to_remove, axis=1) if save_mapping: self._encoded_features = new_data.columns return new_data - def _cell_contains_numeric(self, f, range, x): - i = self._features.index(f) - # convert x to ndarray to allow indexing - a = np.array(x) - value = a.item(i) + @staticmethod + def _cell_contains_numeric(index, range, row): + # convert row to ndarray to allow indexing + a = np.array(row) + value = a.item(index) if range['start']: if value <= range['start']: return False @@ -489,11 +661,11 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM return False return True - def _cell_contains_categorical(self, f, range, x): - i = self._features.index(f) - # convert x to ndarray to allow indexing - a = np.array(x) - value = a.item(i) + @staticmethod + def _cell_contains_categorical(index, range, row): + # convert row to ndarray to allow indexing + a = np.array(row) + value = a.item(index) if value in range: return True return False @@ -685,7 +857,29 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM nodeSet = set(nodes) return [(list(set([i for i, v in enumerate(p) if v == 1]) & nodeSet))[0] for p in paths] - def _generalize(self, original_data, prepared_data, level_nodes, cells, cells_by_id): + # method for applying generalizations (for global generalization-based acuuracy) without dt + def _generalize_from_generalizations(self, original_data, generalizations): + sample_indexes = self._map_to_ranges_categories(original_data, + generalizations['ranges'], + generalizations['categories']) + original_data_generalized = pd.DataFrame(original_data, columns=self._features, copy=True) + for feature in self._generalizations['categories']: + if 'untouched' not in generalizations or feature not in generalizations['untouched']: + for g_index, group in enumerate(generalizations['categories'][feature]): + indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == g_index] + if indexes: + rows = original_data_generalized.iloc[indexes] + rows[feature] = generalizations['category_representatives'][feature][g_index] + for feature in self._generalizations['ranges']: + if 'untouched' not in generalizations or feature not in generalizations['untouched']: + for r_index, range in enumerate(generalizations['ranges'][feature]): + indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == r_index] + if indexes: + rows = original_data_generalized.iloc[indexes] + rows[feature] = generalizations['range_representatives'][feature][r_index] + return original_data_generalized + + def _generalize_from_tree(self, original_data, prepared_data, level_nodes, cells, cells_by_id): mapping_to_cells = self._map_to_cells(prepared_data, level_nodes, cells_by_id) all_indexes = [] for i in range(len(cells)): @@ -728,6 +922,29 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM return original_data_generalized + @staticmethod + def _map_to_ranges_categories(samples, ranges, categories): + all_sample_indexes = [] + for _, row in samples.iterrows(): + sample_indexes = {} + for feature in ranges: + if not ranges[feature]: + # no values means whole range + sample_indexes[feature] = 0 + else: + for index, value in enumerate(ranges[feature]): + if row[feature] <= value: + sample_indexes[feature] = index + break + sample_indexes[feature] = index + 1 + for feature in categories: + for g_index, group in enumerate(categories[feature]): + if row[feature] in group: + sample_indexes[feature] = g_index + break + all_sample_indexes.append(sample_indexes) + return all_sample_indexes + def _map_to_cells(self, samples, nodes, cells_by_id): mapping_to_cells = {} for index, row in samples.iterrows(): @@ -740,41 +957,46 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM return [cells_by_id[nodeId] for nodeId in node_ids] def _remove_feature_from_generalization(self, original_data, prepared_data, nodes, labels, feature_data, - current_accuracy): + current_accuracy, generalize_using_transform): # prepared data include one hot encoded categorical data, # if there is no categorical data prepared data is original data feature = self._get_feature_to_remove(original_data, prepared_data, nodes, labels, feature_data, - current_accuracy) + current_accuracy, generalize_using_transform) if feature is None: return None - GeneralizeToRepresentative._remove_feature_from_cells(self.cells, self._cells_by_id, feature) + self._remove_feature_from_cells(self.cells, self._cells_by_id, feature) return feature - def _get_feature_to_remove(self, original_data, prepared_data, nodes, labels, feature_data, current_accuracy): + def _get_feature_to_remove(self, original_data, prepared_data, nodes, labels, feature_data, current_accuracy, + generalize_using_transform): # prepared data include one hot encoded categorical data, # if there is no categorical data prepared data is original data # We want to remove features with low iLoss (NCP) and high accuracy gain # (after removing them) ranges = self._generalizations['ranges'] - range_counts = self._find_range_count(original_data, ranges) + range_counts = self._find_range_counts(original_data, ranges) total = prepared_data.size range_min = sys.float_info.max remove_feature = None categories = self.generalizations['categories'] - category_counts = self._find_categories_count(original_data, categories) + category_counts = self._find_category_counts(original_data, categories) for feature in ranges.keys(): if feature not in self._generalizations['untouched']: - feature_ncp = self._calc_ncp_numeric(ranges[feature], - range_counts[feature], - feature_data[feature], - total) + if generalize_using_transform: + feature_ncp = self._calculate_ncp_for_feature_from_cells(feature, feature_data, original_data) + else: + feature_ncp = self._calc_ncp_numeric(ranges[feature], + range_counts[feature], + feature_data[feature], + total) if feature_ncp > 0: # divide by accuracy gain new_cells = copy.deepcopy(self.cells) cells_by_id = copy.deepcopy(self._cells_by_id) GeneralizeToRepresentative._remove_feature_from_cells(new_cells, cells_by_id, feature) - generalized = self._generalize(original_data, prepared_data, nodes, new_cells, cells_by_id) + generalized = self._generalize_from_tree(original_data, prepared_data, nodes, new_cells, + cells_by_id) accuracy_gain = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), labels)) - current_accuracy if accuracy_gain < 0: @@ -788,16 +1010,20 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM for feature in categories.keys(): if feature not in self.generalizations['untouched']: - feature_ncp = self._calc_ncp_categorical(categories[feature], - category_counts[feature], - feature_data[feature], - total) + if generalize_using_transform: + feature_ncp = self._calculate_ncp_for_feature_from_cells(feature, feature_data, original_data) + else: + feature_ncp = self._calc_ncp_categorical(categories[feature], + category_counts[feature], + feature_data[feature], + total) if feature_ncp > 0: # divide by accuracy loss new_cells = copy.deepcopy(self.cells) cells_by_id = copy.deepcopy(self._cells_by_id) GeneralizeToRepresentative._remove_feature_from_cells(new_cells, cells_by_id, feature) - generalized = self._generalize(original_data, prepared_data, nodes, new_cells, cells_by_id) + generalized = self._generalize_from_tree(original_data, prepared_data, nodes, new_cells, + cells_by_id) accuracy_gain = self.estimator.score(ArrayDataset(self.encoder.transform(generalized), labels)) - current_accuracy @@ -812,31 +1038,119 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM print('feature to remove: ' + (str(remove_feature) if remove_feature is not None else 'none')) return remove_feature - def _calculate_generalizations(self): - self._generalizations = {'ranges': GeneralizeToRepresentative._calculate_ranges(self.cells), - 'categories': GeneralizeToRepresentative._calculate_categories(self.cells), - 'untouched': GeneralizeToRepresentative._calculate_untouched(self.cells)} - self._remove_categorical_untouched(self._generalizations) + def _calculate_ncp_for_feature_from_cells(self, feature, feature_data, samples_pd): + # count how many records are mapped to each cell + counted = np.zeros(samples_pd.shape[0]) # to mark records we already counted + total = samples_pd.shape[0] + feature_ncp = 0 + for cell in self.cells: + count = self._get_record_count_for_cell(samples_pd, cell, counted) + generalizations = self._calculate_generalizations_for_cell(cell) + cell_ncp = 0 + if feature in cell['ranges']: + cell_ncp = self._calc_ncp_numeric(generalizations['ranges'][feature], + [count], + feature_data[feature], + total) + elif feature in cell['categories']: + cell_ncp = self._calc_ncp_categorical(generalizations['categories'][feature], + [count], + feature_data[feature], + total) + feature_ncp += cell_ncp + return feature_ncp - def _find_range_count(self, samples, ranges): - samples_df = pd.DataFrame(samples, columns=self._encoded_features) + def _calculate_generalizations(self, samples: Optional[pd.DataFrame] = None): + ranges, range_representatives = self._calculate_ranges(self.cells) + categories, category_representatives = self._calculate_categories(self.cells) + self._generalizations = {'ranges': ranges, + 'categories': categories, + 'untouched': self._calculate_untouched(self.cells)} + self._remove_categorical_untouched(self._generalizations) + # compute representative value for each feature (based on data) + if samples is not None: + sample_indexes = self._map_to_ranges_categories(samples, + self._generalizations['ranges'], + self._generalizations['categories']) + # categorical - use most common value + old_category_representatives = category_representatives + category_representatives = {} + for feature in self._generalizations['categories']: + category_representatives[feature] = [] + for g_index, group in enumerate(self._generalizations['categories'][feature]): + indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == g_index] + if indexes: + rows = samples.iloc[indexes] + values = rows[feature] + category = Counter(values).most_common(1)[0][0] + category_representatives[feature].append(category) + else: + category_representatives[feature].append(old_category_representatives[feature][g_index]) + + # numerical - use actual value closest to mean + old_range_representatives = range_representatives + range_representatives = {} + for feature in self._generalizations['ranges']: + range_representatives[feature] = [] + # find the mean value (per feature) + for index in range(len(self._generalizations['ranges'][feature])): + indexes = [i for i, s in enumerate(sample_indexes) if s[feature] == index] + if indexes: + rows = samples.iloc[indexes] + values = rows[feature] + median = np.median(values) + min_value = max(values) + min_dist = float("inf") + for value in values: + # euclidean distance between two floating point values + dist = abs(value - median) + if dist < min_dist: + min_dist = dist + min_value = value + range_representatives[feature].append(min_value) + else: + range_representatives[feature].append(old_range_representatives[feature][index]) + self._generalizations['category_representatives'] = category_representatives + self._generalizations['range_representatives'] = range_representatives + + def _calculate_generalizations_for_cell(self, cell): + ranges, range_representatives = self._calculate_ranges([cell]) + categories, category_representatives = self._calculate_categories([cell]) + generalizations = {'ranges': ranges, + 'categories': categories, + 'untouched': self._calculate_untouched([cell]), + 'range_representatives': range_representatives, + 'category_representatives': category_representatives} + self._remove_categorical_untouched(generalizations) + return generalizations + + def _calculate_cell_generalizations(self): + # calculate generalizations separately per cell + cell_generalizations = {} + for cell in self.cells: + cell_generalizations[cell['id']] = self._calculate_generalizations_for_cell(cell) + return cell_generalizations + + @staticmethod + def _find_range_counts(samples, ranges): range_counts = {} last_value = None for r in ranges.keys(): range_counts[r] = [] # if empty list, all samples should be counted if not ranges[r]: - range_counts[r].append(samples_df.shape[0]) + range_counts[r].append(samples.shape[0]) else: for value in ranges[r]: - counter = [item for item in samples_df[r] if int(item) <= value] + counter = [item for item in samples[r] if int(item) <= value] range_counts[r].append(len(counter)) last_value = value - counter = [item for item in samples_df[r] if int(item) <= last_value] + counter = [item for item in samples[r] if int(item) > last_value] range_counts[r].append(len(counter)) return range_counts - def _find_categories_count(self, samples, categories): + @staticmethod + def _find_category_counts(samples, categories): category_counts = {} for c in categories.keys(): category_counts[c] = [] @@ -844,34 +1158,10 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM category_counts[c].append(len(samples.loc[samples[c].isin(value)])) return category_counts - def _calculate_ncp(self, samples, generalizations, feature_data): - # supressed features are already taken care of within _calc_ncp_numeric - ranges = generalizations['ranges'] - categories = generalizations['categories'] - range_counts = self._find_range_count(samples, ranges) - category_counts = self._find_categories_count(samples, categories) - - total = samples.shape[0] - total_ncp = 0 - total_features = len(generalizations['untouched']) - for feature in ranges.keys(): - feature_ncp = self._calc_ncp_numeric(ranges[feature], range_counts[feature], - feature_data[feature], total) - total_ncp = total_ncp + feature_ncp - total_features += 1 - for feature in categories.keys(): - featureNCP = self._calc_ncp_categorical(categories[feature], category_counts[feature], - feature_data[feature], - total) - total_ncp = total_ncp + featureNCP - total_features += 1 - if total_features == 0: - return 0 - return total_ncp / total_features - @staticmethod def _calculate_ranges(cells): ranges = {} + range_representatives = {} for cell in cells: for feature in [key for key in cell['ranges'].keys() if 'untouched' not in cell or key not in cell['untouched']]: @@ -881,17 +1171,37 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM ranges[feature].append(cell['ranges'][feature]['start']) if cell['ranges'][feature]['end'] is not None: ranges[feature].append(cell['ranges'][feature]['end']) + # default representative values (computed with no data) for feature in ranges.keys(): - ranges[feature] = list(set(ranges[feature])) - ranges[feature].sort() - return ranges + range_representatives[feature] = [] + if not ranges[feature]: + # no values means the complete range. Without data we cannot know what to put here. + # Using 0 as a placeholder. + range_representatives[feature].append(0) + else: + ranges[feature] = list(set(ranges[feature])) + ranges[feature].sort() + prev_value = 0 + for index, value in enumerate(ranges[feature]): + if index == 0: + # for first range, use min value + range_representatives[feature].append(value) + else: + # use middle of range (this will be a float) + range_representatives[feature].append((value - prev_value) / 2) + prev_value = value + # for last range use max value + 1 + range_representatives[feature].append(prev_value + 1) + return ranges, range_representatives @staticmethod def _calculate_categories(cells): categories = {} + category_representatives = {} categorical_features_values = GeneralizeToRepresentative._calculate_categorical_features_values(cells) for feature in categorical_features_values.keys(): partitions = [] + category_representatives[feature] = [] values = categorical_features_values[feature] assigned = [] for i in range(len(values)): @@ -908,8 +1218,10 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM partition.append(value2) assigned.append(value2) partitions.append(partition) + # default representative values (computed with no data) + category_representatives[feature].append(partition[0]) # random categories[feature] = partitions - return categories + return categories, category_representatives @staticmethod def _calculate_categorical_features_values(cells): @@ -942,26 +1254,6 @@ class GeneralizeToRepresentative(BaseEstimator, MetaEstimatorMixin, TransformerM untouched = untouched.intersection(*untouched_lists) return list(untouched) - @staticmethod - def _calc_ncp_categorical(categories, categoryCount, feature_data, total): - category_sizes = [len(g) if len(g) > 1 else 0 for g in categories] - normalized_category_sizes = [s * n / total for s, n in zip(category_sizes, categoryCount)] - average_group_size = sum(normalized_category_sizes) / len(normalized_category_sizes) - return average_group_size / feature_data['range'] # number of values in category - - @staticmethod - def _calc_ncp_numeric(feature_range, range_count, feature_data, total): - # if there are no ranges, feature is supressed and iLoss is 1 - if not feature_range: - return 1 - # range only contains the split values, need to add min and max value of feature - # to enable computing sizes of all ranges - new_range = [feature_data['min']] + feature_range + [feature_data['max']] - range_sizes = [b - a for a, b in zip(new_range[::1], new_range[1::1])] - normalized_range_sizes = [s * n / total for s, n in zip(range_sizes, range_count)] - average_range_size = sum(normalized_range_sizes) / len(normalized_range_sizes) - return average_range_size / (feature_data['max'] - feature_data['min']) - @staticmethod def _remove_feature_from_cells(cells, cells_by_id, feature): for cell in cells: diff --git a/apt/utils/datasets/datasets.py b/apt/utils/datasets/datasets.py index 2e0a70a..b3278f4 100644 --- a/apt/utils/datasets/datasets.py +++ b/apt/utils/datasets/datasets.py @@ -15,11 +15,12 @@ import pandas as pd import logging import torch from torch import Tensor +from scipy.sparse import csr_matrix logger = logging.getLogger(__name__) -INPUT_DATA_ARRAY_TYPE = Union[np.ndarray, pd.DataFrame, List, Tensor] +INPUT_DATA_ARRAY_TYPE = Union[np.ndarray, pd.DataFrame, List, Tensor, csr_matrix] OUTPUT_DATA_ARRAY_TYPE = np.ndarray DATA_PANDAS_NUMPY_TYPE = Union[np.ndarray, pd.DataFrame] @@ -29,14 +30,16 @@ def array2numpy(arr: INPUT_DATA_ARRAY_TYPE) -> OUTPUT_DATA_ARRAY_TYPE: """ converts from INPUT_DATA_ARRAY_TYPE to numpy array """ - if type(arr) == np.ndarray: + if isinstance(arr, np.ndarray): return arr - if type(arr) == pd.DataFrame or type(arr) == pd.Series: + if isinstance(arr, pd.DataFrame) or isinstance(arr, pd.Series): return arr.to_numpy() if isinstance(arr, list): return np.array(arr) - if type(arr) == Tensor: + if isinstance(arr, Tensor): return arr.detach().cpu().numpy() + if isinstance(arr, csr_matrix): + return arr.toarray() raise ValueError("Non supported type: ", type(arr).__name__) @@ -45,14 +48,16 @@ def array2torch_tensor(arr: INPUT_DATA_ARRAY_TYPE) -> Tensor: """ converts from INPUT_DATA_ARRAY_TYPE to torch tensor array """ - if type(arr) == np.ndarray: + if isinstance(arr, np.ndarray): return torch.from_numpy(arr) - if type(arr) == pd.DataFrame or type(arr) == pd.Series: + if isinstance(arr, pd.DataFrame) or isinstance(arr, pd.Series): return torch.from_numpy(arr.to_numpy()) if isinstance(arr, list): return torch.tensor(arr) - if type(arr) == Tensor: + if isinstance(arr, Tensor): return arr + if isinstance(arr, csr_matrix): + return torch.from_numpy(arr.toarray()) raise ValueError("Non supported type: ", type(arr).__name__) @@ -217,7 +222,7 @@ class ArrayDataset(Dataset): def __init__(self, x: INPUT_DATA_ARRAY_TYPE, y: Optional[INPUT_DATA_ARRAY_TYPE] = None, features_names: Optional[list] = None, **kwargs): - self.is_pandas = self.is_pandas = type(x) == pd.DataFrame or type(x) == pd.Series + self.is_pandas = self.is_pandas = isinstance(x, pd.DataFrame) or isinstance(x, pd.Series) self.features_names = features_names self._y = array2numpy(y) if y is not None else None @@ -325,7 +330,7 @@ class PytorchData(Dataset): self._y = array2torch_tensor(y) if y is not None else None self._x = array2torch_tensor(x) - self.is_pandas = type(x) == pd.DataFrame or type(x) == pd.Series + self.is_pandas = isinstance(x, pd.DataFrame) or isinstance(x, pd.Series) if self.is_pandas: self.features_names = x.columns diff --git a/apt/utils/models/model.py b/apt/utils/models/model.py index ebf0464..157158e 100644 --- a/apt/utils/models/model.py +++ b/apt/utils/models/model.py @@ -43,7 +43,7 @@ def get_nb_classes(y: OUTPUT_DATA_ARRAY_TYPE) -> int: if y is None: return 0 - if type(y) != np.ndarray: + if not isinstance(y, np.ndarray): raise ValueError("Input should be numpy array") if is_one_hot(y): @@ -339,8 +339,8 @@ class BlackboxClassifierPredictions(BlackboxClassifier): y_test_pred = check_and_transform_label_format(y_test_pred, nb_classes=self._nb_classes) if x_train_pred is not None and y_train_pred is not None and x_test_pred is not None and y_test_pred is not None: - if type(y_train_pred) != np.ndarray or type(y_test_pred) != np.ndarray \ - or type(y_train_pred) != np.ndarray or type(y_test_pred) != np.ndarray: + if not isinstance(y_train_pred, np.ndarray) or not isinstance(y_test_pred, np.ndarray) \ + or not isinstance(y_train_pred, np.ndarray) or not isinstance(y_test_pred, np.ndarray): raise NotImplementedError("X/Y Data should be numpy array") x_pred = np.vstack((x_train_pred, x_test_pred)) y_pred = np.vstack((y_train_pred, y_test_pred)) diff --git a/apt/utils/models/sklearn_model.py b/apt/utils/models/sklearn_model.py index a58f167..6f40c65 100644 --- a/apt/utils/models/sklearn_model.py +++ b/apt/utils/models/sklearn_model.py @@ -46,7 +46,7 @@ class SklearnClassifier(SklearnModel): def __init__(self, model: BaseEstimator, output_type: ModelOutputType, black_box_access: Optional[bool] = True, unlimited_queries: Optional[bool] = True, **kwargs): super().__init__(model, output_type, black_box_access, unlimited_queries, **kwargs) - self._art_model = ArtSklearnClassifier(model) + self._art_model = ArtSklearnClassifier(model, preprocessing=None) def fit(self, train_data: Dataset, **kwargs) -> None: """ diff --git a/requirements.txt b/requirements.txt index 4af8475..91bf617 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -numpy==1.22.0 -pandas~=1.1.0 -scipy==1.4.1 +numpy==1.24.2 +pandas==1.1.05 +scipy==1.10.1 scikit-learn>=0.22.2,<=1.1.3 torch>=1.8.0 tqdm>=4.64.1 diff --git a/tests/test_data_assessment.py b/tests/test_data_assessment.py index 8ecad07..e4fe982 100644 --- a/tests/test_data_assessment.py +++ b/tests/test_data_assessment.py @@ -18,7 +18,7 @@ MIN_SHARE = 0.5 MIN_ROC_AUC = 0.0 MIN_PRECISION = 0.0 -NUM_SYNTH_SAMPLES = 40000 +NUM_SYNTH_SAMPLES = 400 NUM_SYNTH_COMPONENTS = 4 iris_dataset_np = get_iris_dataset_np() @@ -109,8 +109,8 @@ def kde(n_samples, n_components, original_data): digit_data = original_data pca = PCA(n_components=n_components, whiten=False) data = pca.fit_transform(digit_data) - params = {'bandwidth': np.logspace(-1, 1, 20)} - grid = GridSearchCV(KernelDensity(), params, cv=5) + params = {'bandwidth': np.logspace(-1, 1, 10)} + grid = GridSearchCV(KernelDensity(), params, cv=2) grid.fit(data) kde_estimator = grid.best_estimator_ diff --git a/tests/test_minimizer.py b/tests/test_minimizer.py index e050937..4a484c4 100644 --- a/tests/test_minimizer.py +++ b/tests/test_minimizer.py @@ -1,6 +1,7 @@ import pytest import numpy as np import pandas as pd +import scipy from sklearn.compose import ColumnTransformer @@ -24,12 +25,12 @@ tf.compat.v1.disable_eager_execution() @pytest.fixture -def dataset(): +def diabetes_dataset(): return load_diabetes() -def test_minimizer_params(): - # Assume two features, age and height, and boolean label +@pytest.fixture +def cells(): cells = [{"id": 1, "ranges": {"age": {"start": None, "end": 38}, "height": {"start": None, "end": 170}}, "label": 0, 'categories': {}, "representative": {"age": 26, "height": 149}}, {"id": 2, "ranges": {"age": {"start": 39, "end": None}, "height": {"start": None, "end": 170}}, "label": 1, @@ -40,147 +41,15 @@ def test_minimizer_params(): 'categories': {}, "representative": {"age": 45, "height": 176}} ] features = ['age', 'height'] - X = np.array([[23, 165], + x = np.array([[23, 165], [45, 158], [18, 190]]) y = [1, 1, 0] - base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, - min_samples_leaf=1) - model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) - - gen = GeneralizeToRepresentative(model, cells=cells) - gen.fit() - gen.transform(dataset=ArrayDataset(X, features_names=features)) + return cells, features, x, y -def test_minimizer_fit(): - features = ['age', 'height'] - X = np.array([[23, 165], - [45, 158], - [56, 123], - [67, 154], - [45, 149], - [42, 166], - [73, 172], - [94, 168], - [69, 175], - [24, 181], - [18, 190]]) - y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, - min_samples_leaf=1) - model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) - ad = ArrayDataset(X) - predictions = model.predict(ad) - if predictions.shape[1] > 1: - predictions = np.argmax(predictions, axis=1) - target_accuracy = 0.5 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy) - train_dataset = ArrayDataset(X, predictions, features_names=features) - - gen.fit(dataset=train_dataset) - transformed = gen.transform(dataset=ad) - gener = gen.generalizations - expected_generalizations = {'ranges': {}, 'categories': {}, 'untouched': ['height', 'age']} - - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (X[indexes])).any()) - - rel_accuracy = model.score(ArrayDataset(transformed, predictions)) - assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) - - -def test_minimizer_fit_pandas(): - features = ['age', 'height', 'sex', 'ola'] - X = [[23, 165, 'f', 'aa'], - [45, 158, 'f', 'aa'], - [56, 123, 'f', 'bb'], - [67, 154, 'm', 'aa'], - [45, 149, 'f', 'bb'], - [42, 166, 'm', 'bb'], - [73, 172, 'm', 'bb'], - [94, 168, 'f', 'aa'], - [69, 175, 'm', 'aa'], - [24, 181, 'm', 'bb'], - [18, 190, 'm', 'bb']] - y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - X = pd.DataFrame(X, columns=features) - - numeric_features = ["age", "height"] - numeric_transformer = Pipeline( - steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] - ) - - categorical_features = ["sex", "ola"] - categorical_transformer = OneHotEncoder(handle_unknown="ignore") - - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, numeric_features), - ("cat", categorical_transformer, categorical_features), - ] - ) - encoded = preprocessor.fit_transform(X) - encoded = pd.DataFrame(encoded) - base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, - min_samples_leaf=1) - model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(encoded, y)) - predictions = model.predict(ArrayDataset(encoded)) - if predictions.shape[1] > 1: - predictions = np.argmax(predictions, axis=1) - - # Append classifier to preprocessing pipeline. - # Now we have a full prediction pipeline. - target_accuracy = 0.5 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, - categorical_features=categorical_features) - train_dataset = ArrayDataset(X, predictions) - gen.fit(dataset=train_dataset) - transformed = gen.transform(dataset=ArrayDataset(X)) - gener = gen.generalizations - expected_generalizations = {'ranges': {'age': []}, 'categories': {'sex': [['f', 'm']], 'ola': [['aa', 'bb']]}, - 'untouched': ['height']} - - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1)) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[modified_features]).equals(X[modified_features])) is False) - - rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) - assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) - - -def test_minimizer_params_categorical(): - # Assume three features, age, sex and height, and boolean label +@pytest.fixture +def cells_categorical(): cells = [{'id': 1, 'label': 0, 'ranges': {'age': {'start': None, 'end': None}}, 'categories': {'sex': ['f', 'm']}, 'hist': [2, 0], 'representative': {'age': 45, 'height': 149, 'sex': 'f'}, @@ -194,9 +63,8 @@ def test_minimizer_params_categorical(): 'representative': {'age': 18, 'height': 190, 'sex': 'm'}, 'untouched': ['height']} ] - features = ['age', 'height', 'sex'] - X = [[23, 165, 'f'], + x = [[23, 165, 'f'], [45, 158, 'f'], [56, 123, 'f'], [67, 154, 'm'], @@ -207,15 +75,161 @@ def test_minimizer_params_categorical(): [69, 175, 'm'], [24, 181, 'm'], [18, 190, 'm']] - y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - X = pd.DataFrame(X, columns=features) - numeric_features = ["age", "height"] + return cells, features, x, y + + +@pytest.fixture +def data_two_features(): + x = np.array([[23, 165], + [45, 158], + [56, 123], + [67, 154], + [45, 149], + [42, 166], + [73, 172], + [94, 168], + [69, 175], + [24, 181], + [18, 190]]) + y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) + x1 = np.array([[33, 165], + [43, 150], + [71, 143], + [92, 194], + [13, 125], + [22, 169]]) + features = ['age', 'height'] + return x, y, features, x1 + + +@pytest.fixture +def data_three_features(): + features = ['age', 'height', 'weight'] + x = np.array([[23, 165, 70], + [45, 158, 67], + [56, 123, 65], + [67, 154, 90], + [45, 149, 67], + [42, 166, 58], + [73, 172, 68], + [94, 168, 69], + [69, 175, 80], + [24, 181, 95], + [18, 190, 102]]) + y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) + return x, y, features + + +@pytest.fixture +def data_four_features(): + features = ['age', 'height', 'sex', 'ola'] + x = [[23, 165, 'f', 'aa'], + [45, 158, 'f', 'aa'], + [56, 123, 'f', 'bb'], + [67, 154, 'm', 'aa'], + [45, 149, 'f', 'bb'], + [42, 166, 'm', 'bb'], + [73, 172, 'm', 'bb'], + [94, 168, 'f', 'aa'], + [69, 175, 'm', 'aa'], + [24, 181, 'm', 'bb'], + [18, 190, 'm', 'bb']] + y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) + x1 = [[33, 165, 'f', 'aa'], + [43, 150, 'm', 'aa'], + [71, 143, 'f', 'aa'], + [92, 194, 'm', 'aa'], + [13, 125, 'f', 'aa'], + [22, 169, 'f', 'bb']] + return x, y, features, x1 + + +@pytest.fixture +def data_five_features(): + features = ['age', 'height', 'weight', 'sex', 'ola'] + x = [[23, 165, 65, 'f', 'aa'], + [45, 158, 76, 'f', 'aa'], + [56, 123, 78, 'f', 'bb'], + [67, 154, 87, 'm', 'aa'], + [45, 149, 45, 'f', 'bb'], + [42, 166, 76, 'm', 'bb'], + [73, 172, 85, 'm', 'bb'], + [94, 168, 92, 'f', 'aa'], + [69, 175, 95, 'm', 'aa'], + [24, 181, 49, 'm', 'bb'], + [18, 190, 69, 'm', 'bb']] + y = pd.Series([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) + return x, y, features + + +def compare_generalizations(gener, expected_generalizations): + for key in expected_generalizations['ranges']: + assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) + for key in expected_generalizations['categories']: + assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) + == set([frozenset(sl) for sl in gener['categories'][key]])) + assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) + if 'range_representatives' in expected_generalizations: + for key in expected_generalizations['range_representatives']: + assert (set(expected_generalizations['range_representatives'][key]) + == set(gener['range_representatives'][key])) + if 'category_representatives' in expected_generalizations: + for key in expected_generalizations['category_representatives']: + assert (set([frozenset(sl) for sl in expected_generalizations['category_representatives'][key]]) + == set([frozenset(sl) for sl in gener['category_representatives'][key]])) + + +def check_features(features, expected_generalizations, transformed, x, pandas=False): + modified_features = [f for f in features if + f in expected_generalizations['categories'].keys() or f in expected_generalizations[ + 'ranges'].keys()] + + if pandas: + np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), x.drop(modified_features, axis=1)) + if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: + assert (((transformed[modified_features]).equals(x[modified_features])) is False) + else: + indexes = [] + for i in range(len(features)): + if features[i] in modified_features: + indexes.append(i) + if len(indexes) != transformed.shape[1]: + assert ((np.delete(transformed, indexes, axis=1) == np.delete(x, indexes, axis=1)).all()) + if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: + assert (((transformed[indexes]) != (x[indexes])).any()) + + +def check_ncp(ncp, expected_generalizations): + if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: + assert (ncp > 0.0) + + +def test_minimizer_params(cells): + # Assume two features, age and height, and boolean label + cells, features, x, y = cells + + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(x, y)) + + expected_generalizations = {'categories': {}, 'category_representatives': {}, + 'range_representatives': {'age': [38, 0.5, 40], 'height': [170, 0.5, 172]}, + 'ranges': {'age': [38, 39], 'height': [170, 171]}, 'untouched': []} + + gen = GeneralizeToRepresentative(model, cells=cells) + gener = gen.generalizations + compare_generalizations(gener, expected_generalizations) + gen.fit() + gen.transform(dataset=ArrayDataset(x, features_names=features)) + + +def create_encoder(numeric_features, categorical_features, x): numeric_transformer = Pipeline( steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] ) - categorical_features = ["sex"] categorical_transformer = OneHotEncoder(handle_unknown="ignore") preprocessor = ColumnTransformer( @@ -224,8 +238,207 @@ def test_minimizer_params_categorical(): ("cat", categorical_transformer, categorical_features), ] ) - encoded = preprocessor.fit_transform(X) - encoded = pd.DataFrame(encoded) + encoded = preprocessor.fit_transform(x) + if scipy.sparse.issparse(encoded): + pd.DataFrame.sparse.from_spmatrix(encoded) + else: + encoded = pd.DataFrame(encoded) + + return preprocessor, encoded + + +def test_minimizer_params_not_transform(cells): + # Assume two features, age and height, and boolean label + cells, features, x, y = cells + samples = ArrayDataset(x, y, features) + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(x, y)) + + gen = GeneralizeToRepresentative(model, cells=cells, generalize_using_transform=False) + ncp = gen.calculate_ncp(samples) + assert (ncp > 0.0) + + +def test_minimizer_fit(data_two_features): + x, y, features, _ = data_two_features + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) + predictions = model.predict(ad) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + target_accuracy = 0.5 + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy) + train_dataset = ArrayDataset(x, predictions, features_names=features) + + gen.fit(dataset=train_dataset) + transformed = gen.transform(dataset=ad) + gener = gen.generalizations + expected_generalizations = {'ranges': {}, 'categories': {}, 'untouched': ['height', 'age']} + + compare_generalizations(gener, expected_generalizations) + check_features(features, expected_generalizations, transformed, x) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) + + rel_accuracy = model.score(ArrayDataset(transformed, predictions)) + assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) + + +def test_minimizer_ncp(data_two_features): + x, y, features, x1 = data_two_features + + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) + ad1 = ArrayDataset(x1, features_names=features) + predictions = model.predict(ad) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + target_accuracy = 0.4 + train_dataset = ArrayDataset(x, predictions, features_names=features) + + gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, generalize_using_transform=False) + gen1.fit(dataset=train_dataset) + ncp1 = gen1.ncp.fit_score + ncp2 = gen1.calculate_ncp(ad1) + + gen2 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy) + gen2.fit(dataset=train_dataset) + ncp3 = gen2.ncp.fit_score + gen2.transform(dataset=ad1) + ncp4 = gen2.ncp.transform_score + gen2.transform(dataset=ad) + ncp5 = gen2.ncp.transform_score + gen2.transform(dataset=ad1) + ncp6 = gen2.ncp.transform_score + + assert (ncp1 <= ncp3) + assert (ncp2 != ncp3) + assert (ncp3 != ncp4) + assert (ncp4 != ncp5) + assert (ncp6 == ncp4) + + +def test_minimizer_ncp_categorical(data_four_features): + x, y, features, x1 = data_four_features + x = pd.DataFrame(x, columns=features) + x1 = pd.DataFrame(x1, columns=features) + + numeric_features = ["age", "height"] + categorical_features = ["sex", "ola"] + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x) + + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(encoded, y)) + ad = ArrayDataset(x) + ad1 = ArrayDataset(x1) + predictions = model.predict(ArrayDataset(encoded)) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + target_accuracy = 0.4 + train_dataset = ArrayDataset(x, predictions, features_names=features) + + gen1 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, + categorical_features=categorical_features, generalize_using_transform=False) + gen1.fit(dataset=train_dataset) + ncp1 = gen1.ncp.fit_score + ncp2 = gen1.calculate_ncp(ad1) + + gen2 = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, categorical_features=categorical_features) + gen2.fit(dataset=train_dataset) + ncp3 = gen2.ncp.fit_score + gen2.transform(dataset=ad1) + ncp4 = gen2.ncp.transform_score + gen2.transform(dataset=ad) + ncp5 = gen2.ncp.transform_score + gen2.transform(dataset=ad1) + ncp6 = gen2.ncp.transform_score + + assert (ncp1 <= ncp3) + assert (ncp2 != ncp3) + assert (ncp3 != ncp4) + assert (ncp4 != ncp5) + assert (ncp6 == ncp4) + + +def test_minimizer_fit_not_transform(data_two_features): + x, y, features, x1 = data_two_features + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) + predictions = model.predict(ad) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + target_accuracy = 0.5 + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, generalize_using_transform=False) + train_dataset = ArrayDataset(x, predictions, features_names=features) + + gen.fit(dataset=train_dataset) + gener = gen.generalizations + expected_generalizations = {'ranges': {'age': [], 'height': [157.0]}, 'categories': {}, 'untouched': []} + + compare_generalizations(gener, expected_generalizations) + + ncp = gen.ncp.fit_score + check_ncp(ncp, expected_generalizations) + + +def test_minimizer_fit_pandas(data_four_features): + x, y, features, _ = data_four_features + x = pd.DataFrame(x, columns=features) + + numeric_features = ["age", "height"] + categorical_features = ["sex", "ola"] + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x) + + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(encoded, y)) + predictions = model.predict(ArrayDataset(encoded)) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + + # Append classifier to preprocessing pipeline. + # Now we have a full prediction pipeline. + target_accuracy = 0.5 + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, + categorical_features=categorical_features) + train_dataset = ArrayDataset(x, predictions) + gen.fit(dataset=train_dataset) + transformed = gen.transform(dataset=ArrayDataset(x)) + gener = gen.generalizations + expected_generalizations = {'ranges': {'age': []}, 'categories': {}, + 'untouched': ['height', 'sex', 'ola']} + + compare_generalizations(gener, expected_generalizations) + check_features(features, expected_generalizations, transformed, x, True) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) + + rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) + assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) + + +def test_minimizer_params_categorical(cells_categorical): + # Assume three features, age, sex and height, and boolean label + cells, features, x, y = cells_categorical + + x = pd.DataFrame(x, columns=features) + numeric_features = ["age", "height"] + categorical_features = ["sex"] + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x) base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) @@ -238,103 +451,51 @@ def test_minimizer_params_categorical(): target_accuracy = 0.5 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, categorical_features=categorical_features, cells=cells) - train_dataset = ArrayDataset(X, predictions) + train_dataset = ArrayDataset(x, predictions) gen.fit(dataset=train_dataset) - transformed = gen.transform(dataset=ArrayDataset(X)) + transformed = gen.transform(dataset=ArrayDataset(x)) rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_minimizer_fit_QI(): - features = ['age', 'height', 'weight'] - X = np.array([[23, 165, 70], - [45, 158, 67], - [56, 123, 65], - [67, 154, 90], - [45, 149, 67], - [42, 166, 58], - [73, 172, 68], - [94, 168, 69], - [69, 175, 80], - [24, 181, 95], - [18, 190, 102]]) - print(X) - y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - QI = ['age', 'weight'] +def test_minimizer_fit_qi(data_three_features): + x, y, features = data_three_features + qi = ['age', 'weight'] base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) - ad = ArrayDataset(X) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) predictions = model.predict(ad) if predictions.shape[1] > 1: predictions = np.argmax(predictions, axis=1) target_accuracy = 0.5 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=QI) - train_dataset = ArrayDataset(X, predictions, features_names=features) + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=qi) + train_dataset = ArrayDataset(x, predictions, features_names=features) gen.fit(dataset=train_dataset) transformed = gen.transform(dataset=ad) gener = gen.generalizations expected_generalizations = {'ranges': {'age': [], 'weight': [67.5]}, 'categories': {}, 'untouched': ['height']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(X, [0, 2], axis=1)).all()) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (X[indexes])).any()) + compare_generalizations(gener, expected_generalizations) + check_features(features, expected_generalizations, transformed, x) + assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(x, [0, 2], axis=1)).all()) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_minimizer_fit_pandas_QI(): - features = ['age', 'height', 'weight', 'sex', 'ola'] - X = [[23, 165, 65, 'f', 'aa'], - [45, 158, 76, 'f', 'aa'], - [56, 123, 78, 'f', 'bb'], - [67, 154, 87, 'm', 'aa'], - [45, 149, 45, 'f', 'bb'], - [42, 166, 76, 'm', 'bb'], - [73, 172, 85, 'm', 'bb'], - [94, 168, 92, 'f', 'aa'], - [69, 175, 95, 'm', 'aa'], - [24, 181, 49, 'm', 'bb'], - [18, 190, 69, 'm', 'bb']] - - y = pd.Series([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - X = pd.DataFrame(X, columns=features) - QI = ['age', 'weight', 'ola'] +def test_minimizer_fit_pandas_qi(data_five_features): + x, y, features = data_five_features + x = pd.DataFrame(x, columns=features) + qi = ['age', 'weight', 'ola'] numeric_features = ["age", "height", "weight"] - numeric_transformer = Pipeline( - steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] - ) - categorical_features = ["sex", "ola"] - categorical_transformer = OneHotEncoder(handle_unknown="ignore") + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, numeric_features), - ("cat", categorical_transformer, categorical_features), - ] - ) - encoded = preprocessor.fit_transform(X) - encoded = pd.DataFrame(encoded) base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) @@ -347,31 +508,19 @@ def test_minimizer_fit_pandas_QI(): # Now we have a full prediction pipeline. target_accuracy = 0.5 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, - categorical_features=categorical_features, features_to_minimize=QI) - train_dataset = ArrayDataset(X, predictions) + categorical_features=categorical_features, features_to_minimize=qi) + train_dataset = ArrayDataset(x, predictions) gen.fit(dataset=train_dataset) - transformed = gen.transform(dataset=ArrayDataset(X)) + transformed = gen.transform(dataset=ArrayDataset(x)) gener = gen.generalizations expected_generalizations = {'ranges': {'age': [], 'weight': [47.0]}, 'categories': {'ola': [['bb', 'aa']]}, 'untouched': ['height', 'sex']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - # assert (transformed.drop(QI, axis=1).equals(X.drop(QI, axis=1))) - np.testing.assert_array_equal(transformed.drop(QI, axis=1), X.drop(QI, axis=1)) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - # assert (transformed.drop(modified_features, axis=1).equals(X.drop(modified_features, axis=1))) - np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1)) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[modified_features]).equals(X[modified_features])) is False) + compare_generalizations(gener, expected_generalizations) + check_features(features, expected_generalizations, transformed, x, True) + np.testing.assert_array_equal(transformed.drop(qi, axis=1), x.drop(qi, axis=1)) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) @@ -379,8 +528,8 @@ def test_minimizer_fit_pandas_QI(): def test_minimize_ndarray_iris(): features = ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'] - (x_train, y_train), (x_test, y_test) = get_iris_dataset_np() - QI = ['sepal length (cm)', 'petal length (cm)'] + (x_train, y_train), _ = get_iris_dataset_np() + qi = ['sepal length (cm)', 'petal length (cm)'] base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) @@ -389,40 +538,25 @@ def test_minimize_ndarray_iris(): if predictions.shape[1] > 1: predictions = np.argmax(predictions, axis=1) target_accuracy = 0.3 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=QI) - # gen.fit(dataset=ArrayDataset(x_train, predictions)) + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=qi) transformed = gen.fit_transform(dataset=ArrayDataset(x_train, predictions, features_names=features)) gener = gen.generalizations expected_generalizations = {'ranges': {'sepal length (cm)': [], 'petal length (cm)': [2.449999988079071]}, 'categories': {}, 'untouched': ['petal width (cm)', 'sepal width (cm)']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) + compare_generalizations(gener, expected_generalizations) assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(x_train, [0, 2], axis=1)).all()) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (x_train[indexes])).any()) + check_features(features, expected_generalizations, transformed, x_train) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) def test_minimize_pandas_adult(): - (x_train, y_train), (x_test, y_test) = get_adult_dataset_pd() + (x_train, y_train), _ = get_adult_dataset_pd() x_train = x_train.head(1000) y_train = y_train.head(1000) @@ -433,22 +567,12 @@ def test_minimize_pandas_adult(): categorical_features = ['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'hours-per-week', 'native-country'] - QI = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', + qi = ['age', 'workclass', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'] numeric_features = [f for f in features if f not in categorical_features] - numeric_transformer = Pipeline( - steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] - ) - categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, numeric_features), - ("cat", categorical_transformer, categorical_features), - ] - ) - encoded = preprocessor.fit_transform(x_train) - encoded = pd.DataFrame(encoded) + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x_train) + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) @@ -458,7 +582,7 @@ def test_minimize_pandas_adult(): predictions = np.argmax(predictions, axis=1) target_accuracy = 0.7 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, - categorical_features=categorical_features, features_to_minimize=QI) + categorical_features=categorical_features, features_to_minimize=qi) gen.fit(dataset=ArrayDataset(x_train, predictions, features_names=features)) transformed = gen.transform(dataset=ArrayDataset(x_train)) gener = gen.generalizations @@ -476,31 +600,20 @@ def test_minimize_pandas_adult(): ['Euro_1', 'LatinAmerica', 'BritishCommonwealth', 'SouthAmerica', 'UnitedStates', 'China', 'Euro_2', 'SE_Asia', 'Other', 'Unknown']]}, 'untouched': ['capital-loss', 'hours-per-week', 'capital-gain']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - # assert (transformed.drop(QI, axis=1).equals(x_train.drop(QI, axis=1))) - np.testing.assert_array_equal(transformed.drop(QI, axis=1), x_train.drop(QI, axis=1)) + compare_generalizations(gener, expected_generalizations) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - # assert (transformed.drop(modified_features, axis=1).equals(x_train.drop(modified_features, axis=1))) - np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), x_train.drop(modified_features, axis=1)) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[modified_features]).equals(x_train[modified_features])) is False) + np.testing.assert_array_equal(transformed.drop(qi, axis=1), x_train.drop(qi, axis=1)) + + check_features(features, expected_generalizations, transformed, x_train, True) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) def test_german_credit_pandas(): - (x_train, y_train), (x_test, y_test) = get_german_credit_dataset_pd() + (x_train, y_train), _ = get_german_credit_dataset_pd() features = ["Existing_checking_account", "Duration_in_month", "Credit_history", "Purpose", "Credit_amount", "Savings_account", "Present_employment_since", "Installment_rate", "Personal_status_sex", "debtors", "Present_residence", "Property", "Age", "Other_installment_plans", "Housing", @@ -509,22 +622,12 @@ def test_german_credit_pandas(): categorical_features = ["Existing_checking_account", "Credit_history", "Purpose", "Savings_account", "Present_employment_since", "Personal_status_sex", "debtors", "Property", "Other_installment_plans", "Housing", "Job"] - QI = ["Duration_in_month", "Credit_history", "Purpose", "debtors", "Property", "Other_installment_plans", + qi = ["Duration_in_month", "Credit_history", "Purpose", "debtors", "Property", "Other_installment_plans", "Housing", "Job"] numeric_features = [f for f in features if f not in categorical_features] - numeric_transformer = Pipeline( - steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] - ) - categorical_transformer = OneHotEncoder(handle_unknown="ignore", sparse=False) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, numeric_features), - ("cat", categorical_transformer, categorical_features), - ] - ) - encoded = preprocessor.fit_transform(x_train) - encoded = pd.DataFrame(encoded) + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x_train) + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) @@ -534,7 +637,7 @@ def test_german_credit_pandas(): predictions = np.argmax(predictions, axis=1) target_accuracy = 0.7 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, - categorical_features=categorical_features, features_to_minimize=QI) + categorical_features=categorical_features, features_to_minimize=qi) gen.fit(dataset=ArrayDataset(x_train, predictions)) transformed = gen.transform(dataset=ArrayDataset(x_train)) gener = gen.generalizations @@ -554,43 +657,33 @@ def test_german_credit_pandas(): 'Age', 'Existing_checking_account', 'Credit_amount', 'Present_employment_since']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - # assert (transformed.drop(QI, axis=1).equals(x_train.drop(QI, axis=1))) - np.testing.assert_array_equal(transformed.drop(QI, axis=1), x_train.drop(QI, axis=1)) + compare_generalizations(gener, expected_generalizations) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - # assert (transformed.drop(modified_features, axis=1).equals(x_train.drop(modified_features, axis=1))) - np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), x_train.drop(modified_features, axis=1)) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[modified_features]).equals(x_train[modified_features])) is False) + np.testing.assert_array_equal(transformed.drop(qi, axis=1), x_train.drop(qi, axis=1)) + + check_features(features, expected_generalizations, transformed, x_train, True) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(preprocessor.transform(transformed), predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_regression(dataset): - x_train, x_test, y_train, y_test = train_test_split(dataset.data, dataset.target, test_size=0.5, random_state=14) +def test_regression(diabetes_dataset): + x_train, x_test, y_train, y_test = train_test_split(diabetes_dataset.data, diabetes_dataset.target, test_size=0.5, + random_state=14) base_est = DecisionTreeRegressor(random_state=10, min_samples_split=2) model = SklearnRegressor(base_est) model.fit(ArrayDataset(x_train, y_train)) predictions = model.predict(ArrayDataset(x_train)) - QI = ['age', 'bmi', 's2', 's5'] + qi = ['age', 'bmi', 's2', 's5'] features = ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6'] target_accuracy = 0.7 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, is_regression=True, - features_to_minimize=QI) + features_to_minimize=qi) gen.fit(dataset=ArrayDataset(x_train, predictions, features_names=features)) transformed = gen.transform(dataset=ArrayDataset(x_train, features_names=features)) print('Base model accuracy (R2 score): ', model.score(ArrayDataset(x_test, y_test))) @@ -625,34 +718,20 @@ def test_regression(dataset): 0.061315815430134535, 0.06272498145699501, 0.06460387445986271]}, 'categories': {}, 'untouched': ['s5', 's3', 'bp', 's1', 'sex', 's6', 's4']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) + compare_generalizations(gener, expected_generalizations) assert ((np.delete(transformed, [0, 2, 5, 8], axis=1) == np.delete(x_train, [0, 2, 5, 8], axis=1)).all()) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (x_train[indexes])).any()) + check_features(features, expected_generalizations, transformed, x_train) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_X_y(): - features = [0, 1, 2] - X = np.array([[23, 165, 70], +def test_x_y(): + features = ['0', '1', '2'] + x = np.array([[23, 165, 70], [45, 158, 67], [56, 123, 65], [67, 154, 90], @@ -663,50 +742,36 @@ def test_X_y(): [69, 175, 80], [24, 181, 95], [18, 190, 102]]) - print(X) + print(x) y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - QI = [0, 2] + qi = [0, 2] base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) - ad = ArrayDataset(X) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) predictions = model.predict(ad) if predictions.shape[1] > 1: predictions = np.argmax(predictions, axis=1) target_accuracy = 0.5 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=QI) - gen.fit(X=X, y=predictions) - transformed = gen.transform(X) + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=qi) + gen.fit(X=x, y=predictions) + transformed = gen.transform(x) gener = gen.generalizations expected_generalizations = {'ranges': {'0': [], '2': [67.5]}, 'categories': {}, 'untouched': ['1']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(X, [0, 2], axis=1)).all()) - modified_features = [f for f in features if - str(f) in expected_generalizations['categories'].keys() or str(f) in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (X[indexes])).any()) + compare_generalizations(gener, expected_generalizations) + assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(x, [0, 2], axis=1)).all()) + check_features(features, expected_generalizations, transformed, x) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_X_y_features_names(): +def test_x_y_features_names(): features = ['age', 'height', 'weight'] - X = np.array([[23, 165, 70], + x = np.array([[23, 165, 70], [45, 158, 67], [56, 123, 65], [67, 154, 90], @@ -717,81 +782,42 @@ def test_X_y_features_names(): [69, 175, 80], [24, 181, 95], [18, 190, 102]]) - print(X) + print(x) y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - QI = ['age', 'weight'] + qi = ['age', 'weight'] base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) - ad = ArrayDataset(X) + model.fit(ArrayDataset(x, y)) + ad = ArrayDataset(x) predictions = model.predict(ad) if predictions.shape[1] > 1: predictions = np.argmax(predictions, axis=1) target_accuracy = 0.5 - gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=QI) - gen.fit(X=X, y=predictions, features_names=features) - transformed = gen.transform(X=X, features_names=features) + gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, features_to_minimize=qi) + gen.fit(X=x, y=predictions, features_names=features) + transformed = gen.transform(X=x, features_names=features) gener = gen.generalizations expected_generalizations = {'ranges': {'age': [], 'weight': [67.5]}, 'categories': {}, 'untouched': ['height']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(X, [0, 2], axis=1)).all()) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(X, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (X[indexes])).any()) + compare_generalizations(gener, expected_generalizations) + assert ((np.delete(transformed, [0, 2], axis=1) == np.delete(x, [0, 2], axis=1)).all()) + check_features(features, expected_generalizations, transformed, x) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_BaseEstimator_classification(): - features = ['age', 'height', 'weight', 'sex', 'ola'] - X = [[23, 165, 65, 'f', 'aa'], - [45, 158, 76, 'f', 'aa'], - [56, 123, 78, 'f', 'bb'], - [67, 154, 87, 'm', 'aa'], - [45, 149, 45, 'f', 'bb'], - [42, 166, 76, 'm', 'bb'], - [73, 172, 85, 'm', 'bb'], - [94, 168, 92, 'f', 'aa'], - [69, 175, 95, 'm', 'aa'], - [24, 181, 49, 'm', 'bb'], - [18, 190, 69, 'm', 'bb']] - - y = pd.Series([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) - X = pd.DataFrame(X, columns=features) +def test_BaseEstimator_classification(data_five_features): + x, y, features = data_five_features + x = pd.DataFrame(x, columns=features) QI = ['age', 'weight', 'ola'] numeric_features = ["age", "height", "weight"] - numeric_transformer = Pipeline( - steps=[('imputer', SimpleImputer(strategy='constant', fill_value=0))] - ) - categorical_features = ["sex", "ola"] - categorical_transformer = OneHotEncoder(handle_unknown="ignore") + preprocessor, encoded = create_encoder(numeric_features, categorical_features, x) - preprocessor = ColumnTransformer( - transformers=[ - ("num", numeric_transformer, numeric_features), - ("cat", categorical_transformer, categorical_features), - ] - ) - encoded = preprocessor.fit_transform(X) - encoded = pd.DataFrame(encoded) base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, min_samples_leaf=1) model = base_est @@ -803,37 +829,27 @@ def test_BaseEstimator_classification(): target_accuracy = 0.5 gen = GeneralizeToRepresentative(model, target_accuracy=target_accuracy, categorical_features=categorical_features, features_to_minimize=QI) - train_dataset = ArrayDataset(X, predictions) + train_dataset = ArrayDataset(x, predictions) gen.fit(dataset=train_dataset) - transformed = gen.transform(dataset=ArrayDataset(X)) + transformed = gen.transform(dataset=ArrayDataset(x)) gener = gen.generalizations expected_generalizations = {'ranges': {'age': [], 'weight': [47.0]}, 'categories': {'ola': [['bb', 'aa']]}, 'untouched': ['height', 'sex']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) - # assert (transformed.drop(QI, axis=1).equals(X.drop(QI, axis=1))) - np.testing.assert_array_equal(transformed.drop(QI, axis=1), X.drop(QI, axis=1)) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - # assert (transformed.drop(modified_features, axis=1).equals(X.drop(modified_features, axis=1))) - np.testing.assert_array_equal(transformed.drop(modified_features, axis=1), X.drop(modified_features, axis=1)) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[modified_features]).equals(X[modified_features])) is False) + compare_generalizations(gener, expected_generalizations) + + np.testing.assert_array_equal(transformed.drop(QI, axis=1), x.drop(QI, axis=1)) + check_features(features, expected_generalizations, transformed, x, True) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(preprocessor.transform(transformed), predictions) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) -def test_BaseEstimator_regression(dataset): - x_train, x_test, y_train, y_test = train_test_split(dataset.data, dataset.target, test_size=0.5, random_state=14) +def test_BaseEstimator_regression(diabetes_dataset): + x_train, x_test, y_train, y_test = train_test_split(diabetes_dataset.data, diabetes_dataset.target, test_size=0.5, + random_state=14) base_est = DecisionTreeRegressor(random_state=10, min_samples_split=2) model = base_est @@ -879,33 +895,19 @@ def test_BaseEstimator_regression(dataset): 0.061315815430134535, 0.06272498145699501, 0.06460387445986271]}, 'categories': {}, 'untouched': ['s5', 's3', 'bp', 's1', 'sex', 's6', 's4']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) + compare_generalizations(gener, expected_generalizations) assert ((np.delete(transformed, [0, 2, 5, 8], axis=1) == np.delete(x_train, [0, 2, 5, 8], axis=1)).all()) - modified_features = [f for f in features if - f in expected_generalizations['categories'].keys() or f in expected_generalizations[ - 'ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_train, indexes, axis=1)).all()) - ncp = gen.ncp - if len(expected_generalizations['ranges'].keys()) > 0 or len(expected_generalizations['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (x_train[indexes])).any()) + check_features(features, expected_generalizations, transformed, x_train) + ncp = gen.ncp.transform_score + check_ncp(ncp, expected_generalizations) rel_accuracy = model.score(transformed, predictions) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) def test_keras_model(): - (X, y), (x_test, y_test) = get_iris_dataset_np() + (x, y), (x_test, y_test) = get_iris_dataset_np() base_est = Sequential() base_est.add(Input(shape=(4,))) @@ -915,7 +917,7 @@ def test_keras_model(): base_est.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]) model = KerasClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) - model.fit(ArrayDataset(X, y)) + model.fit(ArrayDataset(x, y)) ad = ArrayDataset(x_test) predictions = model.predict(ad) if predictions.shape[1] > 1: @@ -929,17 +931,9 @@ def test_keras_model(): gener = gen.generalizations features = ['0', '1', '2', '3'] - modified_features = [f for f in features if - f in gener['categories'].keys() or f in gener['ranges'].keys()] - indexes = [] - for i in range(len(features)): - if features[i] in modified_features: - indexes.append(i) - assert ((np.delete(transformed, indexes, axis=1) == np.delete(x_test, indexes, axis=1)).all()) - ncp = gen.ncp - if len(gener['ranges'].keys()) > 0 or len(gener['categories'].keys()) > 0: - assert (ncp > 0) - assert (((transformed[indexes]) != (X[indexes])).any()) + check_features(features, gener, transformed, x) + ncp = gen.ncp.transform_score + check_ncp(ncp, gener) rel_accuracy = model.score(ArrayDataset(transformed, predictions)) assert ((rel_accuracy >= target_accuracy) or (target_accuracy - rel_accuracy) <= 0.05) @@ -959,9 +953,33 @@ def test_untouched(): gen._calculate_generalizations() gener = gen.generalizations expected_generalizations = {'ranges': {'age': [38, 39]}, 'categories': {}, 'untouched': ['gender']} - for key in expected_generalizations['ranges']: - assert (set(expected_generalizations['ranges'][key]) == set(gener['ranges'][key])) - for key in expected_generalizations['categories']: - assert (set([frozenset(sl) for sl in expected_generalizations['categories'][key]]) - == set([frozenset(sl) for sl in gener['categories'][key]])) - assert (set(expected_generalizations['untouched']) == set(gener['untouched'])) + compare_generalizations(gener, expected_generalizations) + + +def test_errors(): + features = ['age', 'height'] + X = np.array([[23, 165], + [45, 158], + [56, 123], + [67, 154], + [45, 149], + [42, 166], + [73, 172], + [94, 168], + [69, 175], + [24, 181], + [18, 190]]) + y = np.array([1, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0]) + base_est = DecisionTreeClassifier(random_state=0, min_samples_split=2, + min_samples_leaf=1) + model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES) + model.fit(ArrayDataset(X, y)) + ad = ArrayDataset(X) + predictions = model.predict(ad) + if predictions.shape[1] > 1: + predictions = np.argmax(predictions, axis=1) + gen = GeneralizeToRepresentative(model, generalize_using_transform=False) + train_dataset = ArrayDataset(X, predictions, features_names=features) + gen.fit(dataset=train_dataset) + with pytest.raises(ValueError): + gen.transform(X)