This commit is contained in:
cmalvr 2025-02-24 00:07:10 +01:00
parent 57e38ea4fa
commit a7dc7b14ee
5 changed files with 434 additions and 1 deletions

0
apt/security/__init__.py Normal file
View file

106
apt/security/shamir.py Normal file
View file

@ -0,0 +1,106 @@
import random
def mod_inverse(a, prime):
"""
Compute the modular inverse of a modulo 'prime'.
"""
return pow(a, -1, prime)
def polynom(x, coefficients, prime):
"""
Evaluate a polynomial (coeff_0 + coeff_1*x + ...) at x, modulo 'prime'.
"""
result = 0
for coefficient in reversed(coefficients):
result = (result * x + coefficient) % prime
return result
def generate_shares(secret, n, k, prime=2**127 - 1):
"""
Split 'secret' (an integer) into 'n' shares with threshold 'k'.
Returns a list of (x, y) tuples.
:param secret: The secret to be split (integer).
:param n: The total number of shares to generate.
:param k: The minimum number of shares needed to reconstruct the secret.
:param prime: A large prime number > any secret value to use in modular arithmetic.
"""
# Generate k-1 random coefficients for the polynomial.
coefficients = [secret] + [random.randrange(0, prime) for _ in range(k - 1)]
shares = []
for i in range(1, n + 1):
x = i
y = polynom(x, coefficients, prime)
shares.append((x, y))
return shares
def reconstruct_secret(shares, prime=2**127 - 1):
"""
Reconstruct the secret from a list of (x, y) shares using Lagrange interpolation.
"""
secret = 0
for j, (xj, yj) in enumerate(shares):
numerator = 1
denominator = 1
for m, (xm, _) in enumerate(shares):
if m != j:
numerator = (numerator * (-xm)) % prime
denominator = (denominator * (xj - xm)) % prime
# Multiply the partial result by the modular inverse of 'denominator'
lagrange_coeff = numerator * mod_inverse(denominator, prime)
secret = (secret + yj * lagrange_coeff) % prime
return secret
class ShamirSecretSharingWrapper:
"""
A simple wrapper class that applies Shamir's Secret Sharing to pandas DataFrames.
Splits integer (or integer-scaled float) values into shares, and reconstructs them.
"""
def __init__(self, n_shares=5, threshold=3, prime=2**127 - 1):
"""
:param n_shares: Total number of shares per secret.
:param threshold: Minimum number of shares needed to reconstruct.
:param prime: A large prime for modular arithmetic.
"""
self.n_shares = n_shares
self.threshold = threshold
self.prime = prime
def split_value(self, value):
"""
Convert 'value' to int and split into shares.
If you're dealing with floats, you should multiply by a scale factor
externally before calling this method.
"""
secret = int(value)
return generate_shares(secret, self.n_shares, self.threshold, self.prime)
def split_dataframe(self, df, sensitive_columns):
"""
For each column in 'sensitive_columns', split each cell's value into shares.
Returns a dictionary mapping each column to a DataFrame of shares
(each row corresponds to a record, each column is one share).
"""
import pandas as pd
shares_dict = {}
for col in sensitive_columns:
col_shares = []
for val in df[col]:
# Generate shares for the value
share_list = self.split_value(val)
# Store only the y-part of each (x, y) share (assuming x=1..n_shares).
col_shares.append([s[1] for s in share_list])
# Create a DataFrame for these shares
col_shares_df = pd.DataFrame(
col_shares,
columns=[f"{col}_share_{i}" for i in range(1, self.n_shares + 1)]
)
shares_dict[col] = col_shares_df
return shares_dict
def reconstruct_value(self, shares_subset):
"""
Reconstruct a secret from a subset of shares (list of (x, y) tuples).
The subset must meet or exceed the threshold.
"""
return reconstruct_secret(shares_subset, self.prime)

142
apt/security/sharer.py Normal file
View file

@ -0,0 +1,142 @@
import pandas as pd
import numpy as np
from apt.security.shamir import ShamirSecretSharingWrapper
# --- Custom NCP Functions ---
def calc_ncp_numeric(original_series: pd.Series, generalized_series: pd.Series) -> float:
"""
Compute the NCP for a numerical feature as the ratio of the generalized range to the original range.
"""
orig_min, orig_max = original_series.min(), original_series.max()
gen_min, gen_max = generalized_series.min(), generalized_series.max()
total_range = orig_max - orig_min
if total_range == 0:
return 0.0
gen_range = gen_max - gen_min
return gen_range / total_range
def calc_ncp_categorical(original_series: pd.Series, generalized_series: pd.Series) -> float:
"""
Compute the NCP for a categorical feature as one minus the relative frequency of the most common category.
"""
counts = generalized_series.value_counts(normalize=True)
if counts.empty:
return 0.0
return 1 - counts.iloc[0]
def calculate_ncp_feature(original_df: pd.DataFrame, generalized_df: pd.DataFrame, feature: str) -> float:
"""
Compute the NCP for a single feature by selecting the appropriate function based on the feature type.
"""
if pd.api.types.is_numeric_dtype(original_df[feature]):
return calc_ncp_numeric(original_df[feature], generalized_df[feature])
else:
return calc_ncp_categorical(original_df[feature], generalized_df[feature])
# --- Main Function to Evaluate and Select Best Secret-Sharing Candidate Feature ---
def select_best_sharing_feature(minimized_df: pd.DataFrame,
original_df: pd.DataFrame,
untouched_features: list,
model,
y_test,
threshold: int = 3,
scale_factor: int = 100,
min_acceptable_accuracy: float = None):
"""
For each untouched feature in the minimized dataset, apply Shamir secret sharing (using the given scale factor
and threshold), reconstruct that feature, and evaluate the model's accuracy when that feature is replaced by
its reconstruction.
Untouched features are processed in order from highest to lowest sensitivity (as measured by NCP).
Parameters:
minimized_df: DataFrame containing the minimized (generalized) data.
original_df: DataFrame containing the original training data (used for computing NCP).
untouched_features: List of feature names left "untouched" during minimization.
model: A trained model with a score() method (e.g., model1).
y_test: Ground truth labels for evaluation.
threshold: Minimum number of shares required for reconstruction.
scale_factor: Factor to scale float values to integers.
min_acceptable_accuracy: The minimum acceptable model accuracy.
If provided, the function will stop on the first feature
whose reconstructed dataset achieves at least this accuracy.
Returns:
A tuple (best_feature, best_accuracy, best_reconstructed_df) where:
- best_feature: The feature selected for secret sharing.
- best_accuracy: The model's accuracy on the dataset with that feature reconstructed.
- best_reconstructed_df: The corresponding DataFrame.
"""
# Initialize the Shamir wrapper.
sss = ShamirSecretSharingWrapper(n_shares=5, threshold=threshold, scale_factor=scale_factor)
# Compute baseline accuracy on the minimized data.
baseline_acc = model.score(minimized_df, y_test)
print(f"[Debug] Baseline model accuracy on minimized data: {baseline_acc:.4f}")
# Compute sensitivity scores (NCP) for each untouched feature.
sensitivity_scores = {}
for feature in untouched_features:
if feature in original_df.columns and feature in minimized_df.columns:
ncp_val = calculate_ncp_feature(original_df, minimized_df, feature)
sensitivity_scores[feature] = ncp_val
print(f"[Debug] NCP for feature '{feature}' = {ncp_val:.4f}")
else:
print(f"Warning: Feature '{feature}' not found in both DataFrames.")
# Sort features by descending sensitivity (higher NCP means more sensitive).
sorted_features = sorted(sensitivity_scores, key=sensitivity_scores.get, reverse=True)
print(f"[Debug] Sorted untouched features by descending NCP: {sorted_features}")
best_feature = None
best_accuracy = -1
best_reconstructed_df = None
def reconstruct_column(shares_df, threshold=threshold):
"""Reconstruct a column from its shares DataFrame."""
reconstructed = []
for idx, row in shares_df.iterrows():
share_values = row.tolist()
# Re-create share tuples: assume x = 1, 2, ..., n_shares.
share_tuples = [(i + 1, share_values[i]) for i in range(len(share_values))]
recon_val = sss.reconstruct_value(share_tuples[:threshold])
reconstructed.append(recon_val)
return reconstructed
# Iterate over candidate features in descending NCP order.
for feature in sorted_features:
current_ncp = sensitivity_scores[feature]
print(f"\n[Debug] Trying secret sharing for feature '{feature}' (NCP={current_ncp:.4f})")
shares_dict = sss.split_dataframe(minimized_df, [feature])
reconstructed_feature = reconstruct_column(shares_dict[feature], threshold)
# Create a new DataFrame with this feature replaced by its reconstructed values.
rec_df = minimized_df.copy()
rec_df[feature] = reconstructed_feature
# Evaluate the model on the reconstructed dataset.
acc = model.score(rec_df, y_test)
print(f"[Debug] Model accuracy with feature '{feature}' reconstructed: {acc:.4f}")
# Check if it meets or exceeds the best so far
if acc > best_accuracy:
print(f"[Debug] Feature '{feature}' yields new best accuracy: {acc:.4f} (old best was {best_accuracy:.4f})")
best_accuracy = acc
best_feature = feature
best_reconstructed_df = rec_df.copy()
# If a minimum acceptable accuracy is set, choose the first feature meeting it.
if min_acceptable_accuracy is not None:
if acc >= min_acceptable_accuracy:
print(f"[Debug] Feature '{feature}' meets the minimum acceptable accuracy of {min_acceptable_accuracy:.4f}. Stopping.")
break
# Calculate relative accuracy change from baseline to the best found.
rel_change = (best_accuracy - baseline_acc) / baseline_acc * 100 if baseline_acc != 0 else 0
print(f"\n[Debug] Final selection -> Feature: {best_feature}, Accuracy: {best_accuracy:.4f}")
print(f"[Debug] Relative accuracy change: {rel_change:.2f}% from baseline {baseline_acc:.4f}")
return best_feature, best_accuracy, best_reconstructed_df

View file

@ -0,0 +1,185 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data Minimization: Inference Black-Box Attack on the Nursery Dataset"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Add the directory containing shamir.py to the PYTHONPATH if needed.\n",
"import os\n",
"import sys\n",
"sys.path.insert(0, os.path.abspath('.'))\n",
"\n",
"# Import the secret sharing module\n",
"from shamir import ShamirSecretSharingWrapper\n",
"\n",
"# Import your minimizer and dataset utilities.\n",
"# (Assuming GeneralizeToRepresentative is defined in your minimizer module.)\n",
"from apt.minimization import GeneralizeToRepresentative\n",
"from apt.utils.dataset_utils import get_nursery_dataset_pd\n",
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"# ---------------------------\n",
"# Load the Nursery Dataset\n",
"# ---------------------------\n",
"(x_train, y_train), (x_test, y_test) = get_nursery_dataset_pd(transform_social=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train: XGBoost Model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from apt.utils.datasets import ArrayDataset\n",
"from apt.utils.models import SklearnClassifier, ModelOutputType\n",
"from xgboost import XGBClassifier\n",
"\n",
"# Instantiate the XGBoost classifier.\n",
"# Note: We disable the use of the label encoder and specify a log-loss evaluation metric.\n",
"base_est = XGBClassifier(use_label_encoder=False, eval_metric='logloss')\n",
"\n",
"# Wrap the model using SklearnClassifier and specify that it outputs probabilities.\n",
"model = SklearnClassifier(base_est, ModelOutputType.CLASSIFIER_PROBABILITIES)\n",
"\n",
"# Fit the model using your training data.\n",
"model.fit(ArrayDataset(x_train, y_train))\n",
"\n",
"# Evaluate the model's accuracy on test data.\n",
"print('Base model accuracy:', model.score(ArrayDataset(x_test, y_test)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train: XGBoost Minimization"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from apt.minimization import GeneralizeToRepresentative\n",
"from sklearn.model_selection import train_test_split\n",
"\n",
"# Use all columns for minimization in this example.\n",
"minimizer = GeneralizeToRepresentative(model)\n",
"X_generalizer_train, x_test, y_generalizer_train, y_test = train_test_split(x_test, y_test, stratify=y_test,\n",
" test_size = 0.4, random_state = 38)\n",
"x_train_predictions = model.predict(ArrayDataset(X_generalizer_train))\n",
"if x_train_predictions.shape[1] > 1:\n",
" x_train_predictions = np.argmax(x_train_predictions, axis=1)\n",
"minimizer.fit(dataset=ArrayDataset(X_generalizer_train, x_train_predictions))\n",
"transformed = minimizer.transform(dataset=ArrayDataset(x_test))\n",
"\n",
"print('Accuracy on minimized data: ', model.score(ArrayDataset(transformed, y_test)))\n",
"print('generalizations: ',minimizer.generalizations_)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Shamir Secret Sharing: Appplication"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Choose sensitive features (for example, the first two columns)\n",
"sensitive_features = [x_train.columns[0], x_train.columns[1]]\n",
"sss = ShamirSecretSharingWrapper(n_shares=5, threshold=3)\n",
"shares = sss.split_dataframe(generalized_data, sensitive_features)\n",
"\n",
"# Display the secret shares for one sensitive column.\n",
"print(\"Secret Shares for sensitive feature:\", sensitive_features[0])\n",
"print(shares[sensitive_features[0]].head())\n",
"\n",
"# Demonstrate reconstruction: Reconstruct the secret for the first record.\n",
"first_record_shares = shares[sensitive_features[0]].iloc[0].tolist()\n",
"# Re-create share tuples with known x-indices.\n",
"share_tuples = [(i+1, first_record_shares[i]) for i in range(5)]\n",
"reconstructed_value = sss.reconstruct_value(share_tuples[:3]) # using any 3 shares\n",
"print(\"Reconstructed value for first record, {}: {}\".format(sensitive_features[0], reconstructed_value))\n",
"\n",
"# ---------------------------\n",
"# Evaluate Model Accuracy\n",
"# ---------------------------\n",
"# Compute accuracy on the generalized (minimized) data.\n",
"if minimizer.encoder:\n",
" data_for_scoring = minimizer.encoder.transform(generalized_data)\n",
"else:\n",
" data_for_scoring = generalized_data\n",
"model_accuracy = minimizer.estimator.score(data_for_scoring, y_test)\n",
"print(\"Model accuracy on minimized data:\", model_accuracy)\n",
"\n",
"# ---------------------------\n",
"# Membership Inference Attack Metrics using IBM ART\n",
"# ---------------------------\n",
"# Install the ART library if not already installed:\n",
"# !pip install adversarial-robustness-toolbox\n",
"\n",
"from art.attacks.inference.membership_inference import MembershipInferenceBlackBox\n",
"from art.estimators.classification import SklearnClassifier\n",
"\n",
"# Wrap the underlying scikit-learn classifier used by your minimizer.\n",
"# (Assuming minimizer.estimator._model holds the trained sklearn model.)\n",
"art_classifier = SklearnClassifier(model=minimizer.estimator._model)\n",
"\n",
"# Create a membership inference attack (using a black-box approach with a random forest attack model).\n",
"attack = MembershipInferenceBlackBox(art_classifier, attack_model_type='rf')\n",
"\n",
"# Prepare membership data:\n",
"# Label training samples as members (1) and test samples as non-members (0).\n",
"membership_data = pd.concat([x_train, x_test])\n",
"membership_labels = np.concatenate([np.ones(len(x_train)), np.zeros(len(x_test))])\n",
"\n",
"# Run the attack (the infer method returns membership probabilities).\n",
"attack_memberships = attack.infer(membership_data.to_numpy())\n",
"\n",
"# Threshold the probabilities at 0.5 to decide membership.\n",
"attack_pred = (attack_memberships > 0.5).astype(int)\n",
"\n",
"from sklearn.metrics import accuracy_score\n",
"attack_accuracy = accuracy_score(membership_labels, attack_pred)\n",
"print(\"Membership inference attack accuracy:\", attack_accuracy)"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View file

@ -5,7 +5,7 @@ scikit-learn>=0.22.2,<=1.1.3
torch>=1.8.0
tqdm>=4.64.1
matplotlib>=3.7.0
adversarial-robustness-toolbox>=1.11.0
adversarial-robustness-toolbox==1.19.1
# testing
pytest==5.4.2