Input warping using Kumar warping

This commit is contained in:
Shen 2017-09-08 15:10:40 +02:00
parent 9e6eac1cf0
commit e72f0b6c9b
5 changed files with 475 additions and 1 deletions

View file

@ -9,6 +9,7 @@ from .gplvm import GPLVM
from .bcgplvm import BCGPLVM
from .sparse_gplvm import SparseGPLVM
from .warped_gp import WarpedGP
from .input_warped_gp import InputWarpedGP
from .bayesian_gplvm import BayesianGPLVM
from .mrd import MRD
from .gradient_checker import GradientChecker, HessianChecker, SkewChecker

View file

@ -0,0 +1,149 @@
# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..core import GP
from .. import likelihoods
from ..util.input_warping_functions import KumarWarping
from .. import kern
class InputWarpedGP(GP):
"""Input Warped GP
This defines a GP model that applies a warping function to the Input.
By default, it uses Kumar Warping (CDF of Kumaraswamy distribution)
Parameters
----------
X : array_like, shape = (n_samples, n_features) for input data
Y : array_like, shape = (n_samples, 1) for output data
kernel : object, optional
An instance of kernel function defined in GPy.kern
Default to Matern 32
warping_function : object, optional
An instance of warping function defined in GPy.util.input_warping_functions
Default to KumarWarping
warping_indices : list of int, optional
An list of indices of which features in X should be warped.
It is used in the Kumar warping function
normalizer : bool, optional
A bool variable indicates whether to normalize the output
Xmin : list of float, optional
The min values for every feature in X
It is used in the Kumar warping function
Xmax : list of float, optional
The max values for every feature in X
It is used in the Kumar warping function
epsilon : float, optional
We normalize X to [0+e, 1-e]. If not given, using the default value defined in KumarWarping function
Attributes
----------
X_untransformed : array_like, shape = (n_samples, n_features)
A copy of original input X
X_warped : array_like, shape = (n_samples, n_features)
Input data after warping
warping_function : object, optional
An instance of warping function defined in GPy.util.input_warping_functions
Default to KumarWarping
Notes
-----
Kumar warping uses the CDF of Kumaraswamy distribution. More on the Kumaraswamy distribution can be found at the
wiki page: https://en.wikipedia.org/wiki/Kumaraswamy_distribution
References
----------
Snoek, J.; Swersky, K.; Zemel, R. S. & Adams, R. P.
Input Warping for Bayesian Optimization of Non-stationary Functions
preprint arXiv:1402.0929, 2014
"""
def __init__(self, X, Y, kernel=None, normalizer=False, warping_function=None, warping_indices=None, Xmin=None, Xmax=None, epsilon=None):
if X.ndim == 1:
X = X.reshape(-1, 1)
self.X_untransformed = X.copy()
if kernel is None:
kernel = kern.sde_Matern32(X.shape[1], variance=1.)
self.kernel = kernel
if warping_function is None:
self.warping_function = KumarWarping(self.X_untransformed, warping_indices, epsilon, Xmin, Xmax)
else:
self.warping_function = warping_function
self.X_warped = self.transform_data(self.X_untransformed)
likelihood = likelihoods.Gaussian()
super(InputWarpedGP, self).__init__(self.X_warped, Y, likelihood=likelihood, kernel=kernel, normalizer=normalizer)
# Add the parameters in the warping function to the model parameters hierarchy
self.link_parameter(self.warping_function)
def parameters_changed(self):
"""Update the gradients of parameters for warping function
This method is called when having new values of parameters for warping function, kernels
and other parameters in a normal GP
"""
# using the warped X to update
self.X = self.transform_data(self.X_untransformed)
super(InputWarpedGP, self).parameters_changed()
# the gradient of log likelihood w.r.t. input AFTER warping is a product of dL_dK and dK_dX
dL_dX = self.kern.gradients_X(self.grad_dict['dL_dK'], self.X)
self.warping_function.update_grads(self.X_untransformed, dL_dX)
def transform_data(self, X, test_data=False):
"""Apply warping_function to some Input data
Parameters
----------
X : array_like, shape = (n_samples, n_features)
test_data: bool, optional
Default to False, should set to True when transforming test data
"""
return self.warping_function.f(X, test_data)
def log_likelihood(self):
"""Compute the marginal log likelihood
For input warping, just use the normal GP log likelihood
"""
return GP.log_likelihood(self)
def predict(self, Xnew):
"""Prediction on the new data
Parameters
----------
Xnew : array_like, shape = (n_samples, n_features)
The test data.
Returns
-------
mean : array_like, shape = (n_samples, output.dim)
Posterior mean at the location of Xnew
var : array_like, shape = (n_samples, 1)
Posterior variance at the location of Xnew
"""
Xnew_warped = self.transform_data(Xnew, test_data=True)
mean, var = super(InputWarpedGP, self).predict(Xnew_warped, kern=self.kernel, full_cov=False)
return mean, var
if __name__ == '__main__':
X = np.random.randn(100, 1)
Y = np.sin(X) + np.random.randn(100, 1)*0.05
m = InputWarpedGP(X, Y)

View file

@ -399,6 +399,68 @@ class MiscTests(unittest.TestCase):
m.optimize()
print(m)
def test_input_warped_gp_identity(self):
"""
A InputWarpedGP with the identity warping function should be
equal to a standard GP.
"""
k = GPy.kern.RBF(1)
m = GPy.models.GPRegression(self.X, self.Y, kernel=k)
m.optimize()
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.input_warping_functions.IdentifyWarping()
warp_m = GPy.models.InputWarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
def test_kumar_warping_gradient(self):
n_X = 100
np.random.seed(0)
X = np.random.randn(n_X, 2)
Y = np.sum(np.sin(X), 1).reshape(n_X, 1)
k1 = GPy.kern.Linear(2)
m1 = GPy.models.InputWarpedGP(X, Y, kernel=k1)
m1.randomize()
self.assertEquals(m1.checkgrad(), True)
k2 = GPy.kern.RBF(2)
m2 = GPy.models.InputWarpedGP(X, Y, kernel=k2)
m2.randomize()
m2.checkgrad()
self.assertEquals(m2.checkgrad(), True)
k3 = GPy.kern.Matern52(2)
m3 = GPy.models.InputWarpedGP(X, Y, kernel=k3)
m3.randomize()
m3.checkgrad()
self.assertEquals(m3.checkgrad(), True)
def test_kumar_warping_parameters(self):
np.random.seed(1)
X = np.random.rand(5, 2)
epsilon = 1e-6
# testing warping indices
warping_ind_1 = [0, 1, 2]
warping_ind_2 = [-1, 1, 2]
warping_ind_3 = [0, 1.5, 2]
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, warping_ind_1)
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, warping_ind_2)
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, warping_ind_3)
# testing Xmin and Xmax
Xmin_1, Xmax_1 = None, [1, 1]
Xmin_2, Xmax_2 = [0, 0], None
Xmin_3, Xmax_3 = [0, 0, 0], [1, 1]
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, [0, 1], epsilon, Xmin_1, Xmax_1)
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, [0, 1], epsilon, Xmin_2, Xmax_2)
self.failUnlessRaises(ValueError, GPy.util.input_warping_functions.KumarWarping, X, [0, 1], epsilon, Xmin_3, Xmax_3)
def test_warped_gp_identity(self):
"""
A WarpedGP with the identity warping function should be

View file

@ -17,4 +17,4 @@ from . import multioutput
from . import parallel
from . import functions
from . import cluster_with_offset
from . import quad_integrate
from . import input_warping_functions

View file

@ -0,0 +1,262 @@
# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..core.parameterization import Parameterized, Param
from ..core.parameterization.priors import LogGaussian
class InputWarpingFunction(Parameterized):
"""Abstract class for input warping functions
"""
def __init__(self, name):
super(InputWarpingFunction, self).__init__(name=name)
def f(self, X, test=False):
raise NotImplementedError
def fgrad_x(self, X):
raise NotImplementedError
def update_grads(self, X, dL_dW):
raise NotImplementedError
class IdentifyWarping(InputWarpingFunction):
"""The identity warping function, for testing"""
def __init__(self):
super(IdentifyWarping, self).__init__(name='input_warp_identity')
def f(self, X, test_data=False):
return X
def fgrad_X(self, X):
return np.zeros(X.shape)
def update_grads(self, X, dL_dW):
pass
class InputWarpingTest(InputWarpingFunction):
"""The identity warping function, for testing"""
def __init__(self):
super(InputWarpingTest, self).__init__(name='input_warp_test')
self.a = Param('a', 1.0)
self.set_prior(LogGaussian(0.0, 0.75))
self.link_parameter(self.a)
def f(self, X, test_data=False):
return X * self.a
def fgrad_X(self, X):
return self.ones(X.shape) * self.a
def update_grads(self, X, dL_dW):
self.a.gradient[:] = np.sum(dL_dW * X)
class KumarWarping(InputWarpingFunction):
"""Kumar Warping for input data
Parameters
----------
X : array_like, shape = (n_samples, n_features)
The input data that is going to be warped
warping_indices: list of int, optional
The features that are going to be warped
Default to warp all the features
epsilon: float, optional
Used to normalized input data to [0+e, 1-e]
Default to 1e-6
Xmin : list of float, Optional
The min values for each feature defined by users
Default to the train minimum
Xmax : list of float, Optional
The max values for each feature defined by users
Default to the train maximum
Attributes
----------
warping_indices: list of int
The features that are going to be warped
Default to warp all the features
warping_dim: int
The number of features to be warped
Xmin : list of float
The min values for each feature defined by users
Default to the train minimum
Xmax : list of float
The max values for each feature defined by users
Default to the train maximum
epsilon: float
Used to normalized input data to [0+e, 1-e]
Default to 1e-6
X_normalized : array_like, shape = (n_samples, n_features)
The normalized training X
scaling : list of float, length = n_features in X
Defined as 1.0 / (self.Xmax - self.Xmin)
params : list of Param
The list of all the parameters used in Kumar Warping
num_parameters: int
The number of parameters used in Kumar Warping
"""
def __init__(self, X, warping_indices=None, epsilon=None, Xmin=None, Xmax=None):
super(KumarWarping, self).__init__(name='input_warp_kumar')
if warping_indices is not None and np.max(warping_indices) > X.shape[1] -1:
raise ValueError("Kumar warping indices exceed feature dimension")
if warping_indices is not None and np.min(warping_indices) < 0:
raise ValueError("Kumar warping indices should be larger than 0")
if warping_indices is not None and np.any(list(map(lambda x: not isinstance(x, int), warping_indices))):
raise ValueError("Kumar warping indices should be integer")
if Xmin is None and Xmax is None:
Xmin = X.min(axis=0)
Xmax = X.max(axis=0)
else:
if Xmin is None or Xmax is None:
raise ValueError("Xmin and Xmax need to be provide at the same time!")
if len(Xmin) != X.shape[1] or len(Xmax) != X.shape[1]:
raise ValueError("Xmin and Xmax should have n_feature values!")
if epsilon is None:
epsilon = 1e-6
self.epsilon = epsilon
self.Xmin = Xmin - self.epsilon
self.Xmax = Xmax + self.epsilon
self.scaling = 1.0 / (self.Xmax - self.Xmin)
self.X_normalized = (X - self.Xmin) / (self.Xmax - self.Xmin)
if warping_indices is None:
warping_indices = range(X.shape[1])
self.warping_indices = warping_indices
self.warping_dim = len(self.warping_indices)
self.num_parameters = 2 * self.warping_dim
# create parameters
self.params = [[Param('a%d' % i, 1.0), Param('b%d' % i, 1.0)] for i in range(self.warping_dim)]
# add constraints
for i in range(self.warping_dim):
self.params[i][0].constrain_bounded(0.0, 10.0)
self.params[i][1].constrain_bounded(0.0, 10.0)
# set priors and add them into handler
for i in range(self.warping_dim):
self.params[i][0].set_prior(LogGaussian(0.0, 0.75))
self.params[i][1].set_prior(LogGaussian(0.0, 0.75))
self.link_parameter(self.params[i][0])
self.link_parameter(self.params[i][1])
def f(self, X, test_data=False):
"""Apply warping_function to some Input data
Parameters:
-----------
X : array_like, shape = (n_samples, n_features)
test_data: bool, optional
Default to False, should set to True when transforming test data
Returns
-------
X_warped : array_like, shape = (n_samples, n_features)
The warped input data
Math
----
f(x) = 1 - (1 - x^a)^b
"""
X_warped = X.copy()
if test_data:
X_normalized = (X - self.Xmin) / (self.Xmax - self.Xmin)
else:
X_normalized = self.X_normalized
for i_seq, i_fea in enumerate(self.warping_indices):
a, b = self.params[i_seq][0], self.params[i_seq][1]
X_warped[:, i_fea] = 1 - np.power(1 - np.power(X_normalized[:, i_fea], a), b)
return X_warped
def fgrad_X(self, X):
"""Compute the gradient of warping function with respect to X
Parameters
----------
X : array_like, shape = (n_samples, n_features)
The location to compute gradient
Returns
-------
grad : array_like, shape = (n_samples, n_features)
The gradient for every location at X
Math
----
grad = a * b * x ^(a-1) * (1 - x^a)^(b-1)
"""
grad = np.zeros(X.shape)
for i_seq, i_fea in enumerate(self.warping_indices):
a, b = self.params[i_seq][0], self.params[i_seq][1]
grad[:, i_fea] = a * b * np.power(self.X_normalized[:, i_fea], a-1) * \
np.power(1 - np.power(self.X_normalized[:, i_fea], a), b-1) * self.scaling[i_fea]
return grad
def update_grads(self, X, dL_dW):
"""Update the gradients of marginal log likelihood with respect to the parameters of warping function
Parameters
----------
X : array_like, shape = (n_samples, n_features)
The input BEFORE warping
dL_dW : array_like, shape = (n_samples, n_features)
The gradient of marginal log likelihood with respect to the Warped input
Math
----
let w = f(x), the input after warping, then
dW_da = b * (1 - x^a)^(b - 1) * x^a * ln(x)
dW_db = - (1 - x^a)^b * ln(1 - x^a)
dL_da = dL_dW * dW_da
dL_db = dL_dW * dW_db
"""
for i_seq, i_fea in enumerate(self.warping_indices):
ai, bi = self.params[i_seq][0], self.params[i_seq][1]
# cache some value for save some computation
x_pow_a = np.power(self.X_normalized[:, i_fea], ai)
# compute gradient for ai, bi on all X
dz_dai = bi * np.power(1 - x_pow_a, bi-1) * x_pow_a * np.log(self.X_normalized[:, i_fea])
dz_dbi = - np.power(1 - x_pow_a, bi) * np.log(1 - x_pow_a)
# sum gradients on all the data
dL_dai = np.sum(dL_dW[:, i_fea] * dz_dai)
dL_dbi = np.sum(dL_dW[:, i_fea] * dz_dbi)
self.params[i_seq][0].gradient[:] = dL_dai
self.params[i_seq][1].gradient[:] = dL_dbi