diff --git a/GPy/examples/regression.py b/GPy/examples/regression.py index 11734564..19919f97 100644 --- a/GPy/examples/regression.py +++ b/GPy/examples/regression.py @@ -550,3 +550,34 @@ def parametric_mean_function(max_iters=100, optimize=True, plot=True): return m +def warped_gp_cubic_sine(max_iters=100): + """ + A test replicating the cubic sine regression problem from + Snelson's paper. + """ + X = (2 * np.pi) * np.random.random(151) - np.pi + Y = np.sin(X) + np.random.normal(0,0.2,151) + Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + X = X[:, None] + Y = Y[:, None] + + warp_k = GPy.kern.RBF(1) + warp_f = GPy.util.warping_functions.TanhFunction(n_terms=2) + warp_m = GPy.models.WarpedGP(X, Y, kernel=warp_k, warping_function=warp_f) + warp_m['.*\.d'].constrain_fixed(1.0) + m = GPy.models.GPRegression(X, Y) + m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters) + warp_m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters) + #m.optimize(max_iters=max_iters) + #warp_m.optimize(max_iters=max_iters) + + print(warp_m) + print(warp_m['.*warp.*']) + + warp_m.predict_in_warped_space = False + warp_m.plot(title="Warped GP - Latent space") + warp_m.predict_in_warped_space = True + warp_m.plot(title="Warped GP - Warped space") + m.plot(title="Standard GP") + warp_m.plot_warping() + pb.show() diff --git a/GPy/models/warped_gp.py b/GPy/models/warped_gp.py index df947d3e..aa9e8c90 100644 --- a/GPy/models/warped_gp.py +++ b/GPy/models/warped_gp.py @@ -2,24 +2,27 @@ # Licensed under the BSD 3-clause license (see LICENSE.txt) import numpy as np -from ..util.warping_functions import * +#from ..util.warping_functions import * from ..core import GP from .. import likelihoods -from GPy.util.warping_functions import TanhWarpingFunction_d +from paramz import ObsAr +#from GPy.util.warping_functions import TanhFunction +from ..util.warping_functions import TanhFunction from GPy import kern class WarpedGP(GP): + """ + This defines a GP Regression model that applies a + warping function to the output. + """ def __init__(self, X, Y, kernel=None, warping_function=None, warping_terms=3): - if kernel is None: kernel = kern.RBF(X.shape[1]) - if warping_function == None: - self.warping_function = TanhWarpingFunction_d(warping_terms) + self.warping_function = TanhFunction(warping_terms) self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1) * 1) else: self.warping_function = warping_function - self.scale_data = False if self.scale_data: Y = self._scale_data(Y) @@ -27,10 +30,13 @@ class WarpedGP(GP): self.Y_untransformed = Y.copy() self.predict_in_warped_space = True likelihood = likelihoods.Gaussian() - GP.__init__(self, X, self.transform_data(), likelihood=likelihood, kernel=kernel) self.link_parameter(self.warping_function) + def set_XY(self, X=None, Y=None): + self.Y_untransformed = Y.copy() + GP.set_XY(self, X, self.transform_data()) + def _scale_data(self, Y): self._Ymax = Y.max() self._Ymin = Y.min() @@ -40,27 +46,22 @@ class WarpedGP(GP): return (Y + 0.5) * (self._Ymax - self._Ymin) + self._Ymin def parameters_changed(self): + """ + Notice that we update the warping function gradients here. + """ self.Y[:] = self.transform_data() super(WarpedGP, self).parameters_changed() - Kiy = self.posterior.woodbury_vector.flatten() - - grad_y = self.warping_function.fgrad_y(self.Y_untransformed) - grad_y_psi, grad_psi = self.warping_function.fgrad_y_psi(self.Y_untransformed, - return_covar_chain=True) - djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0) - dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0) - - warping_grads = -dquad_dpsi + djac_dpsi - - self.warping_function.psi.gradient[:] = warping_grads[:, :-1] - self.warping_function.d.gradient[:] = warping_grads[0, -1] + self.warping_function.update_grads(self.Y_untransformed, Kiy) def transform_data(self): Y = self.warping_function.f(self.Y_untransformed.copy()).copy() return Y def log_likelihood(self): + """ + Notice we add the jacobian of the warping function here. + """ ll = GP.log_likelihood(self) jacobian = self.warping_function.fgrad_y(self.Y_untransformed) return ll + np.log(jacobian).sum() @@ -73,22 +74,22 @@ class WarpedGP(GP): arg2 = np.ones(shape=gh_samples.shape).dot(mean.T) return self.warping_function.f_inv(arg1 + arg2, y=pred_init) - def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=100): + def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=20): """ Calculate the warped mean by using Gauss-Hermite quadrature. """ gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite) - gh_samples = gh_samples[:,None] - gh_weights = gh_weights[None,:] + gh_samples = gh_samples[:, None] + gh_weights = gh_weights[None, :] return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi) - def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=100): + def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=20): """ Calculate the warped variance by using Gauss-Hermite quadrature. """ gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite) - gh_samples = gh_samples[:,None] - gh_weights = gh_weights[None,:] + gh_samples = gh_samples[:, None] + gh_weights = gh_weights[None, :] arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples, pred_init=pred_init) ** 2) / np.sqrt(np.pi) arg2 = self._get_warped_mean(mean, std, pred_init=pred_init, @@ -96,11 +97,14 @@ class WarpedGP(GP): return arg1 - (arg2 ** 2) def predict(self, Xnew, which_parts='all', pred_init=None, full_cov=False, Y_metadata=None, - median=False, deg_gauss_hermite=100): - # normalize X values - # Xnew = (Xnew.copy() - self._Xoffset) / self._Xscale + median=False, deg_gauss_hermite=20, likelihood=None): + """ + Prediction results depend on: + - The value of the self.predict_in_warped_space flag + - The median flag passed as argument + The likelihood keyword is never used, it is just to follow the plotting API. + """ mu, var = GP._raw_predict(self, Xnew) - # now push through likelihood mean, var = self.likelihood.predictive_values(mu, var) @@ -116,13 +120,11 @@ class WarpedGP(GP): else: wmean = mean wvar = var - if self.scale_data: pred = self._unscale_data(pred) - return wmean, wvar - def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None): + def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None, likelihood=None, median=False): """ Get the predictive quantiles around the prediction at X @@ -137,15 +139,30 @@ class WarpedGP(GP): if self.normalizer is not None: m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v) a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata) - #return [a, b] if not self.predict_in_warped_space: return [a, b] - #print a.shape new_a = self.warping_function.f_inv(a) new_b = self.warping_function.f_inv(b) - return [new_a, new_b] - #return self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata) + + def log_predictive_density(self, x_test, y_test, Y_metadata=None): + """ + Calculation of the log predictive density. Notice we add + the jacobian of the warping function here. + + .. math: + p(y_{*}|D) = p(y_{*}|f_{*})p(f_{*}|\mu_{*}\\sigma^{2}_{*}) + + :param x_test: test locations (x_{*}) + :type x_test: (Nx1) array + :param y_test: test observations (y_{*}) + :type y_test: (Nx1) array + :param Y_metadata: metadata associated with the test points + """ + mu_star, var_star = self._raw_predict(x_test) + fy = self.warping_function.f(y_test) + ll_lpd = self.likelihood.log_predictive_density(fy, mu_star, var_star, Y_metadata=Y_metadata) + return ll_lpd + np.log(self.warping_function.fgrad_y(y_test)) if __name__ == '__main__': diff --git a/GPy/plotting/gpy_plot/plot_util.py b/GPy/plotting/gpy_plot/plot_util.py index 0d472d06..937f35ab 100644 --- a/GPy/plotting/gpy_plot/plot_util.py +++ b/GPy/plotting/gpy_plot/plot_util.py @@ -31,6 +31,7 @@ import numpy as np from scipy import sparse import itertools +from ...models import WarpedGP def in_ipynb(): try: @@ -73,6 +74,9 @@ def helper_predict_with_model(self, Xgrid, plot_raw, apply_link, percentiles, wh if 'output_index' not in predict_kw['Y_metadata']: predict_kw['Y_metadata']['output_index'] = Xgrid[:,-1:].astype(np.int) + if isinstance(self, WarpedGP) and self.predict_in_warped_space: + predict_kw['median'] = True + mu, _ = self.predict(Xgrid, **predict_kw) if percentiles is not None: @@ -295,6 +299,8 @@ def get_x_y_var(model): Y = model.Y.values except AttributeError: Y = model.Y + if isinstance(model, WarpedGP) and model.predict_in_warped_space: + Y = model.Y_untransformed if sparse.issparse(Y): Y = Y.todense().view(np.ndarray) return X, X_variance, Y @@ -381,4 +387,4 @@ def x_frame2D(X,plot_limits=None,resolution=None): resolution = resolution or 50 xx, yy = np.mgrid[xmin[0]:xmax[0]:1j*resolution,xmin[1]:xmax[1]:1j*resolution] Xnew = np.c_[xx.flat, yy.flat] - return Xnew, xx, yy, xmin, xmax \ No newline at end of file + return Xnew, xx, yy, xmin, xmax diff --git a/GPy/testing/model_tests.py b/GPy/testing/model_tests.py index 5d6de7ae..010befe4 100644 --- a/GPy/testing/model_tests.py +++ b/GPy/testing/model_tests.py @@ -361,50 +361,75 @@ class MiscTests(unittest.TestCase): preds = m.predict(self.X) warp_k = GPy.kern.RBF(1) - warp_f = GPy.util.warping_functions.IdentityFunction() - warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f) + warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False) + warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, + warping_function=warp_f) warp_m.optimize() warp_preds = warp_m.predict(self.X) - np.testing.assert_almost_equal(preds, warp_preds) + warp_k_exact = GPy.kern.RBF(1) + warp_f_exact = GPy.util.warping_functions.IdentityFunction() + warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact, + warping_function=warp_f_exact) + warp_m_exact.optimize() + warp_preds_exact = warp_m_exact.predict(self.X) + + np.testing.assert_almost_equal(preds, warp_preds, decimal=4) + np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4) - @unittest.skip('Comment this to plot the modified sine function') - def test_warped_gp_sine(self): + def test_warped_gp_log(self): """ - A test replicating the sine regression problem from - Snelson's paper. + A WarpedGP with the log warping function should be + equal to a standard GP with log labels. + Note that we predict the median here. + """ + k = GPy.kern.RBF(1) + Y = np.abs(self.Y) + logY = np.log(Y) + m = GPy.models.GPRegression(self.X, logY, kernel=k) + m.optimize() + preds = m.predict(self.X)[0] + + warp_k = GPy.kern.RBF(1) + warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False) + warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k, + warping_function=warp_f) + warp_m.optimize() + warp_preds = warp_m.predict(self.X, median=True)[0] + + warp_k_exact = GPy.kern.RBF(1) + warp_f_exact = GPy.util.warping_functions.LogFunction() + warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact, + warping_function=warp_f_exact) + warp_m_exact.optimize(messages=True) + warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0] + + np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4) + np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4) + + def test_warped_gp_cubic_sine(self, max_iters=100): + """ + A test replicating the cubic sine regression problem from + Snelson's paper. This test doesn't have any assertions, it's + just to ensure coverage of the tanh warping function code. """ X = (2 * np.pi) * np.random.random(151) - np.pi - Y = np.sin(X) + np.random.normal(0,0.1,151) - Y = np.exp(Y) - 5 - #Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0 + Y = np.sin(X) + np.random.normal(0,0.2,151) + Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + X = X[:, None] + Y = Y[:, None] - #np.seterr(over='raise') - import matplotlib.pyplot as plt - warp_k = GPy.kern.RBF(1) - warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2) - warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f) - #warp_m['.*variance.*'].constrain_fixed(0.25) - #warp_m['.*lengthscale.*'].constrain_fixed(1) - #warp_m['warp_tanh.d'].constrain_fixed(1) - #warp_m.randomize() - #warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100) - #warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1) - - #print(warp_m.checkgrad()) - #warp_m.plot() - #plt.show() - - warp_m.optimize_restarts(parallel=True, robust=True) - #print(warp_m.checkgrad()) - print(warp_m) - print(warp_m['.*warp.*']) + warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f) + warp_m['.*\.d'].constrain_fixed(1.0) + warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5, + max_iters=max_iters) + warp_m.predict(X) + warp_m.predict_quantiles(X) + warp_m.log_predictive_density(X, Y) warp_m.predict_in_warped_space = False warp_m.plot() warp_m.predict_in_warped_space = True warp_m.plot() - warp_f.plot(X.min()-10, X.max()+10) - plt.show() def test_offset_regression(self): #Tests GPy.models.GPOffsetRegression. Using two small time series @@ -427,7 +452,6 @@ class MiscTests(unittest.TestCase): assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset)) - class GradientTests(np.testing.TestCase): def setUp(self): ###################################### diff --git a/GPy/util/warping_functions.py b/GPy/util/warping_functions.py index 1617283c..6b8b73e9 100644 --- a/GPy/util/warping_functions.py +++ b/GPy/util/warping_functions.py @@ -4,6 +4,7 @@ import numpy as np from ..core.parameterization import Parameterized, Param from paramz.transformations import Logexp +import sys class WarpingFunction(Parameterized): @@ -14,6 +15,7 @@ class WarpingFunction(Parameterized): def __init__(self, name): super(WarpingFunction, self).__init__(name=name) + self.rate = 0.1 def f(self, y, psi): """function transformation @@ -29,15 +31,35 @@ class WarpingFunction(Parameterized): """gradient of f w.r.t to y""" raise NotImplementedError - def f_inv(self, z, psi): - """inverse function transformation""" - raise NotImplementedError + def f_inv(self, z, max_iterations=100, y=None): + """ + Calculate the numerical inverse of f. This should be + overwritten for specific warping functions where the + inverse can be found in closed form. + + :param max_iterations: maximum number of N.R. iterations + """ + + z = z.copy() + y = np.ones_like(z) + + it = 0 + update = np.inf + while np.abs(update).sum() > 1e-10 and it < max_iterations: + fy = self.f(y) + fgrady = self.fgrad_y(y) + update = (fy - z) / fgrady + y -= self.rate * update + it += 1 + if it == max_iterations: + print("WARNING!!! Maximum number of iterations reached in f_inv ") + print("Sum of roots: %.4f" % np.sum(fy - z)) + return y def _get_param_names(self): raise NotImplementedError def plot(self, xmin, xmax): - psi = self.psi y = np.arange(xmin, xmax, 0.01) f_y = self.f(y) from matplotlib import pyplot as plt @@ -49,183 +71,59 @@ class WarpingFunction(Parameterized): plt.show() -class TanhWarpingFunction(WarpingFunction): - - def __init__(self, n_terms=3): - """n_terms specifies the number of tanh terms to be used""" - self.n_terms = n_terms - self.num_parameters = 3 * self.n_terms - super(TanhWarpingFunction, self).__init__(name='warp_tanh') - - def f(self, y, psi): +class TanhFunction(WarpingFunction): + """ + This is the function proposed in Snelson et al.: + A sum of tanh functions with linear trends outside + the range. Notice the term 'd', which scales the + linear trend. + """ + def __init__(self, n_terms=3, initial_y=None): """ - transform y with f using parameter vector psi - psi = [[a,b,c]] - ::math::`f = \\sum_{terms} a * tanh(b*(y+c))` + n_terms specifies the number of tanh terms to be used """ - #1. check that number of params is consistent - assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions' - assert psi.shape[1] == 3, 'inconsistent parameter dimensions' - - #2. exponentiate the a and b (positive!) - mpsi = psi.copy() - - #3. transform data - z = y.copy() - for i in range(len(mpsi)): - a,b,c = mpsi[i] - z += a*np.tanh(b*(y+c)) - return z - - def f_inv(self, y, psi, iterations=10): - """ - calculate the numerical inverse of f - :param iterations: number of N.R. iterations - """ - - y = y.copy() - z = np.ones_like(y) - for i in range(iterations): - z -= (self.f(z, psi) - y)/self.fgrad_y(z,psi) - return z - - def fgrad_y(self, y, psi, return_precalc=False): - """ - gradient of f w.r.t to y ([N x 1]) - returns: Nx1 vector of derivatives, unless return_precalc is true, - then it also returns the precomputed stuff - """ - - mpsi = psi.copy() - - # vectorized version - - # S = (mpsi[:,1]*(y + mpsi[:,2])).T - S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T - R = np.tanh(S) - D = 1-R**2 - - # GRAD = (1+(mpsi[:,0:1]*mpsi[:,1:2]*D).sum(axis=0))[:,np.newaxis] - GRAD = (1+(mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T - - if return_precalc: - # return GRAD,S.sum(axis=1),R.sum(axis=1),D.sum(axis=1) - return GRAD, S, R, D - - return GRAD - - def fgrad_y_psi(self, y, psi, return_covar_chain=False): - """ - gradient of f w.r.t to y and psi - returns: NxIx3 tensor of partial derivatives - """ - - # 1. exponentiate the a and b (positive!) - mpsi = psi.copy() - w, s, r, d = self.fgrad_y(y, psi, return_precalc = True) - - gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3)) - for i in range(len(mpsi)): - a,b,c = mpsi[i] - gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T - gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T - gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T - - if return_covar_chain: - covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3)) - - for i in range(len(mpsi)): - a,b,c = mpsi[i] - covar_grad_chain[:, :, i, 0] = (r[i]).T - covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T) - covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T - - return gradients, covar_grad_chain - - return gradients - - def _get_param_names(self): - variables = ['a', 'b', 'c'] - names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[]) - return names - - -class TanhWarpingFunction_d(WarpingFunction): - - def __init__(self, n_terms=3): - """n_terms specifies the number of tanh terms to be used""" self.n_terms = n_terms self.num_parameters = 3 * self.n_terms + 1 self.psi = np.ones((self.n_terms, 3)) - - super(TanhWarpingFunction_d, self).__init__(name='warp_tanh') + super(TanhFunction, self).__init__(name='warp_tanh') self.psi = Param('psi', self.psi) self.psi[:, :2].constrain_positive() - self.d = Param('%s' % ('d'), 1.0, Logexp()) self.link_parameter(self.psi) self.link_parameter(self.d) + self.initial_y = initial_y def f(self, y): """ Transform y with f using parameter vector psi psi = [[a,b,c]] - :math:`f = \\sum_{terms} a * tanh(b*(y+c))` + :math:`f = (y * d) + \\sum_{terms} a * tanh(b *(y + c))` """ - #1. check that number of params is consistent - # assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions' - # assert psi.shape[1] == 4, 'inconsistent parameter dimensions' - d = self.d mpsi = self.psi - - #3. transform data - z = d*y.copy() + z = d * y.copy() for i in range(len(mpsi)): - a,b,c = mpsi[i] - z += a*np.tanh(b*(y+c)) + a, b, c = mpsi[i] + z += a * np.tanh(b * (y + c)) return z - def f_inv(self, z, max_iterations=1000, y=None): - """ - calculate the numerical inverse of f - - :param max_iterations: maximum number of N.R. iterations - """ - - z = z.copy() - if y is None: - y = np.ones_like(z) - - it = 0 - update = np.inf - - while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations): - fy = self.f(y) - fgrady = self.fgrad_y(y) - update = (fy - z)/fgrady - y -= update - it += 1 - if it == max_iterations: - print("WARNING!!! Maximum number of iterations reached in f_inv ") - return y - def fgrad_y(self, y, return_precalc=False): """ gradient of f w.r.t to y ([N x 1]) - :returns: Nx1 vector of derivatives, unless return_precalc is true, then it also returns the precomputed stuff + :returns: Nx1 vector of derivatives, unless return_precalc is true, + then it also returns the precomputed stuff """ d = self.d mpsi = self.psi # vectorized version - S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T + S = (mpsi[:,1] * (y[:,:,None] + mpsi[:,2])).T R = np.tanh(S) - D = 1-R**2 + D = 1 - (R ** 2) - GRAD = (d + (mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T + GRAD = (d + (mpsi[:,0:1][:,:,None] * mpsi[:,1:2][:,:,None] * D).sum(axis=0)).T if return_precalc: return GRAD, S, R, D @@ -244,45 +142,86 @@ class TanhWarpingFunction_d(WarpingFunction): gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4)) for i in range(len(mpsi)): a,b,c = mpsi[i] - gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T - gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T - gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T - gradients[:,:,0,3] = 1.0 + gradients[:, :, i, 0] = (b * (1.0/np.cosh(s[i])) ** 2).T + gradients[:, :, i, 1] = a * (d[i] - 2.0 * s[i] * r[i] * (1.0/np.cosh(s[i])) ** 2).T + gradients[:, :, i, 2] = (-2.0 * a * (b ** 2) * r[i] * ((1.0 / np.cosh(s[i])) ** 2)).T + gradients[:, :, 0, 3] = 1.0 if return_covar_chain: covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4)) - for i in range(len(mpsi)): a,b,c = mpsi[i] covar_grad_chain[:, :, i, 0] = (r[i]).T - covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T) - covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T + covar_grad_chain[:, :, i, 1] = (a * (y + c) * ((1.0 / np.cosh(s[i])) ** 2).T) + covar_grad_chain[:, :, i, 2] = a * b * ((1.0 / np.cosh(s[i])) ** 2).T covar_grad_chain[:, :, 0, 3] = y - return gradients, covar_grad_chain return gradients def _get_param_names(self): variables = ['a', 'b', 'c', 'd'] - names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[]) - names.append('warp_tanh_d') + names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] + for q in range(self.n_terms)],[]) + names.append('warp_tanh') return names + def update_grads(self, Y_untransformed, Kiy): + grad_y = self.fgrad_y(Y_untransformed) + grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed, + return_covar_chain=True) + djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0) + dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0) + + warping_grads = -dquad_dpsi + djac_dpsi + + self.psi.gradient[:] = warping_grads[:, :-1] + self.d.gradient[:] = warping_grads[0, -1] + + +class LogFunction(WarpingFunction): + """ + Easy wrapper for applying a fixed log warping function to + positive-only values. + The closed_inverse flag should only be set to False for + debugging and testing purposes. + """ + def __init__(self, closed_inverse=True): + self.num_parameters = 0 + super(LogFunction, self).__init__(name='log') + if closed_inverse: + self.f_inv = self._f_inv + + def f(self, y): + return np.log(y) + + def fgrad_y(self, y): + return 1. / y + + def update_grads(self, Y_untransformed, Kiy): + pass + + def fgrad_y_psi(self, y, return_covar_chain=False): + if return_covar_chain: + return 0, 0 + return 0 + + def _f_inv(self, z, y=None): + return np.exp(z) + class IdentityFunction(WarpingFunction): """ Identity warping function. This is for testing and sanity check purposes and should not be used in practice. + The closed_inverse flag should only be set to False for + debugging and testing purposes. """ - def __init__(self): - self.num_parameters = 4 - self.psi = Param('psi', np.zeros((1,3))) - self.d = Param('%s' % ('d'), 1.0, Logexp()) + def __init__(self, closed_inverse=True): + self.num_parameters = 0 super(IdentityFunction, self).__init__(name='identity') - self.link_parameter(self.psi) - self.link_parameter(self.d) - + if closed_inverse: + self.f_inv = self._f_inv def f(self, y): return y @@ -290,11 +229,14 @@ class IdentityFunction(WarpingFunction): def fgrad_y(self, y): return np.ones(y.shape) + def update_grads(self, Y_untransformed, Kiy): + pass + def fgrad_y_psi(self, y, return_covar_chain=False): - gradients = np.zeros((y.shape[0], y.shape[1], len(self.psi), 4)) if return_covar_chain: - return gradients, gradients - return gradients + return 0, 0 + return 0 - def f_inv(self, z, y=None): + def _f_inv(self, z, y=None): return z +