Merge branch 'wgps_improvements' of https://github.com/beckdaniel/GPy into beckdaniel-wgps_improvements

This commit is contained in:
mzwiessele 2016-08-03 13:27:52 +01:00
commit 9be51df4f1
5 changed files with 261 additions and 242 deletions

View file

@ -550,3 +550,34 @@ def parametric_mean_function(max_iters=100, optimize=True, plot=True):
return m return m
def warped_gp_cubic_sine(max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
X = X[:, None]
Y = Y[:, None]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhFunction(n_terms=2)
warp_m = GPy.models.WarpedGP(X, Y, kernel=warp_k, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
m = GPy.models.GPRegression(X, Y)
m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters)
warp_m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters)
#m.optimize(max_iters=max_iters)
#warp_m.optimize(max_iters=max_iters)
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False
warp_m.plot(title="Warped GP - Latent space")
warp_m.predict_in_warped_space = True
warp_m.plot(title="Warped GP - Warped space")
m.plot(title="Standard GP")
warp_m.plot_warping()
pb.show()

View file

@ -2,24 +2,27 @@
# Licensed under the BSD 3-clause license (see LICENSE.txt) # Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np import numpy as np
from ..util.warping_functions import * #from ..util.warping_functions import *
from ..core import GP from ..core import GP
from .. import likelihoods from .. import likelihoods
from GPy.util.warping_functions import TanhWarpingFunction_d from paramz import ObsAr
#from GPy.util.warping_functions import TanhFunction
from ..util.warping_functions import TanhFunction
from GPy import kern from GPy import kern
class WarpedGP(GP): class WarpedGP(GP):
"""
This defines a GP Regression model that applies a
warping function to the output.
"""
def __init__(self, X, Y, kernel=None, warping_function=None, warping_terms=3): def __init__(self, X, Y, kernel=None, warping_function=None, warping_terms=3):
if kernel is None: if kernel is None:
kernel = kern.RBF(X.shape[1]) kernel = kern.RBF(X.shape[1])
if warping_function == None: if warping_function == None:
self.warping_function = TanhWarpingFunction_d(warping_terms) self.warping_function = TanhFunction(warping_terms)
self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1) * 1) self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1) * 1)
else: else:
self.warping_function = warping_function self.warping_function = warping_function
self.scale_data = False self.scale_data = False
if self.scale_data: if self.scale_data:
Y = self._scale_data(Y) Y = self._scale_data(Y)
@ -27,10 +30,13 @@ class WarpedGP(GP):
self.Y_untransformed = Y.copy() self.Y_untransformed = Y.copy()
self.predict_in_warped_space = True self.predict_in_warped_space = True
likelihood = likelihoods.Gaussian() likelihood = likelihoods.Gaussian()
GP.__init__(self, X, self.transform_data(), likelihood=likelihood, kernel=kernel) GP.__init__(self, X, self.transform_data(), likelihood=likelihood, kernel=kernel)
self.link_parameter(self.warping_function) self.link_parameter(self.warping_function)
def set_XY(self, X=None, Y=None):
self.Y_untransformed = Y.copy()
GP.set_XY(self, X, self.transform_data())
def _scale_data(self, Y): def _scale_data(self, Y):
self._Ymax = Y.max() self._Ymax = Y.max()
self._Ymin = Y.min() self._Ymin = Y.min()
@ -40,27 +46,22 @@ class WarpedGP(GP):
return (Y + 0.5) * (self._Ymax - self._Ymin) + self._Ymin return (Y + 0.5) * (self._Ymax - self._Ymin) + self._Ymin
def parameters_changed(self): def parameters_changed(self):
"""
Notice that we update the warping function gradients here.
"""
self.Y[:] = self.transform_data() self.Y[:] = self.transform_data()
super(WarpedGP, self).parameters_changed() super(WarpedGP, self).parameters_changed()
Kiy = self.posterior.woodbury_vector.flatten() Kiy = self.posterior.woodbury_vector.flatten()
self.warping_function.update_grads(self.Y_untransformed, Kiy)
grad_y = self.warping_function.fgrad_y(self.Y_untransformed)
grad_y_psi, grad_psi = self.warping_function.fgrad_y_psi(self.Y_untransformed,
return_covar_chain=True)
djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0)
dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0)
warping_grads = -dquad_dpsi + djac_dpsi
self.warping_function.psi.gradient[:] = warping_grads[:, :-1]
self.warping_function.d.gradient[:] = warping_grads[0, -1]
def transform_data(self): def transform_data(self):
Y = self.warping_function.f(self.Y_untransformed.copy()).copy() Y = self.warping_function.f(self.Y_untransformed.copy()).copy()
return Y return Y
def log_likelihood(self): def log_likelihood(self):
"""
Notice we add the jacobian of the warping function here.
"""
ll = GP.log_likelihood(self) ll = GP.log_likelihood(self)
jacobian = self.warping_function.fgrad_y(self.Y_untransformed) jacobian = self.warping_function.fgrad_y(self.Y_untransformed)
return ll + np.log(jacobian).sum() return ll + np.log(jacobian).sum()
@ -73,22 +74,22 @@ class WarpedGP(GP):
arg2 = np.ones(shape=gh_samples.shape).dot(mean.T) arg2 = np.ones(shape=gh_samples.shape).dot(mean.T)
return self.warping_function.f_inv(arg1 + arg2, y=pred_init) return self.warping_function.f_inv(arg1 + arg2, y=pred_init)
def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=100): def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=20):
""" """
Calculate the warped mean by using Gauss-Hermite quadrature. Calculate the warped mean by using Gauss-Hermite quadrature.
""" """
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite) gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None] gh_samples = gh_samples[:, None]
gh_weights = gh_weights[None,:] gh_weights = gh_weights[None, :]
return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi) return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi)
def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=100): def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=20):
""" """
Calculate the warped variance by using Gauss-Hermite quadrature. Calculate the warped variance by using Gauss-Hermite quadrature.
""" """
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite) gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None] gh_samples = gh_samples[:, None]
gh_weights = gh_weights[None,:] gh_weights = gh_weights[None, :]
arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples, arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples,
pred_init=pred_init) ** 2) / np.sqrt(np.pi) pred_init=pred_init) ** 2) / np.sqrt(np.pi)
arg2 = self._get_warped_mean(mean, std, pred_init=pred_init, arg2 = self._get_warped_mean(mean, std, pred_init=pred_init,
@ -96,11 +97,14 @@ class WarpedGP(GP):
return arg1 - (arg2 ** 2) return arg1 - (arg2 ** 2)
def predict(self, Xnew, which_parts='all', pred_init=None, full_cov=False, Y_metadata=None, def predict(self, Xnew, which_parts='all', pred_init=None, full_cov=False, Y_metadata=None,
median=False, deg_gauss_hermite=100): median=False, deg_gauss_hermite=20, likelihood=None):
# normalize X values """
# Xnew = (Xnew.copy() - self._Xoffset) / self._Xscale Prediction results depend on:
- The value of the self.predict_in_warped_space flag
- The median flag passed as argument
The likelihood keyword is never used, it is just to follow the plotting API.
"""
mu, var = GP._raw_predict(self, Xnew) mu, var = GP._raw_predict(self, Xnew)
# now push through likelihood # now push through likelihood
mean, var = self.likelihood.predictive_values(mu, var) mean, var = self.likelihood.predictive_values(mu, var)
@ -116,13 +120,11 @@ class WarpedGP(GP):
else: else:
wmean = mean wmean = mean
wvar = var wvar = var
if self.scale_data: if self.scale_data:
pred = self._unscale_data(pred) pred = self._unscale_data(pred)
return wmean, wvar return wmean, wvar
def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None): def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None, likelihood=None, median=False):
""" """
Get the predictive quantiles around the prediction at X Get the predictive quantiles around the prediction at X
@ -137,15 +139,30 @@ class WarpedGP(GP):
if self.normalizer is not None: if self.normalizer is not None:
m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v) m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v)
a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata) a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
#return [a, b]
if not self.predict_in_warped_space: if not self.predict_in_warped_space:
return [a, b] return [a, b]
#print a.shape
new_a = self.warping_function.f_inv(a) new_a = self.warping_function.f_inv(a)
new_b = self.warping_function.f_inv(b) new_b = self.warping_function.f_inv(b)
return [new_a, new_b] return [new_a, new_b]
#return self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
def log_predictive_density(self, x_test, y_test, Y_metadata=None):
"""
Calculation of the log predictive density. Notice we add
the jacobian of the warping function here.
.. math:
p(y_{*}|D) = p(y_{*}|f_{*})p(f_{*}|\mu_{*}\\sigma^{2}_{*})
:param x_test: test locations (x_{*})
:type x_test: (Nx1) array
:param y_test: test observations (y_{*})
:type y_test: (Nx1) array
:param Y_metadata: metadata associated with the test points
"""
mu_star, var_star = self._raw_predict(x_test)
fy = self.warping_function.f(y_test)
ll_lpd = self.likelihood.log_predictive_density(fy, mu_star, var_star, Y_metadata=Y_metadata)
return ll_lpd + np.log(self.warping_function.fgrad_y(y_test))
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -31,6 +31,7 @@
import numpy as np import numpy as np
from scipy import sparse from scipy import sparse
import itertools import itertools
from ...models import WarpedGP
def in_ipynb(): def in_ipynb():
try: try:
@ -73,6 +74,9 @@ def helper_predict_with_model(self, Xgrid, plot_raw, apply_link, percentiles, wh
if 'output_index' not in predict_kw['Y_metadata']: if 'output_index' not in predict_kw['Y_metadata']:
predict_kw['Y_metadata']['output_index'] = Xgrid[:,-1:].astype(np.int) predict_kw['Y_metadata']['output_index'] = Xgrid[:,-1:].astype(np.int)
if isinstance(self, WarpedGP) and self.predict_in_warped_space:
predict_kw['median'] = True
mu, _ = self.predict(Xgrid, **predict_kw) mu, _ = self.predict(Xgrid, **predict_kw)
if percentiles is not None: if percentiles is not None:
@ -295,6 +299,8 @@ def get_x_y_var(model):
Y = model.Y.values Y = model.Y.values
except AttributeError: except AttributeError:
Y = model.Y Y = model.Y
if isinstance(model, WarpedGP) and model.predict_in_warped_space:
Y = model.Y_untransformed
if sparse.issparse(Y): Y = Y.todense().view(np.ndarray) if sparse.issparse(Y): Y = Y.todense().view(np.ndarray)
return X, X_variance, Y return X, X_variance, Y
@ -381,4 +387,4 @@ def x_frame2D(X,plot_limits=None,resolution=None):
resolution = resolution or 50 resolution = resolution or 50
xx, yy = np.mgrid[xmin[0]:xmax[0]:1j*resolution,xmin[1]:xmax[1]:1j*resolution] xx, yy = np.mgrid[xmin[0]:xmax[0]:1j*resolution,xmin[1]:xmax[1]:1j*resolution]
Xnew = np.c_[xx.flat, yy.flat] Xnew = np.c_[xx.flat, yy.flat]
return Xnew, xx, yy, xmin, xmax return Xnew, xx, yy, xmin, xmax

View file

@ -361,51 +361,74 @@ class MiscTests(unittest.TestCase):
preds = m.predict(self.X) preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1) warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction() warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f) warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize() warp_m.optimize()
warp_preds = warp_m.predict(self.X) warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds) warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.IdentityFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize()
warp_preds_exact = warp_m_exact.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4)
@unittest.skip('Comment this to plot the modified sine function') def test_warped_gp_log(self):
def test_warped_gp_sine(self):
""" """
A test replicating the sine regression problem from A WarpedGP with the log warping function should be
Snelson's paper. equal to a standard GP with log labels.
Note that we predict the median here.
"""
k = GPy.kern.RBF(1)
Y = np.abs(self.Y)
logY = np.log(Y)
m = GPy.models.GPRegression(self.X, logY, kernel=k)
m.optimize()
preds = m.predict(self.X)[0]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X, median=True)[0]
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.LogFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize(messages=True)
warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0]
np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4)
np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4)
def test_warped_gp_cubic_sine(self, max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper. This test doesn't have any assertions, it's
just to ensure coverage of the tanh warping function code.
""" """
X = (2 * np.pi) * np.random.random(151) - np.pi X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.1,151) Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.exp(Y) - 5 Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0 X = X[:, None]
Y = Y[:, None]
#np.seterr(over='raise') warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f)
import matplotlib.pyplot as plt warp_m['.*\.d'].constrain_fixed(1.0)
warp_k = GPy.kern.RBF(1) warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2) max_iters=max_iters)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f) warp_m.predict(X)
#warp_m['.*variance.*'].constrain_fixed(0.25) warp_m.predict_quantiles(X)
#warp_m['.*lengthscale.*'].constrain_fixed(1) warp_m.log_predictive_density(X, Y)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False warp_m.predict_in_warped_space = False
warp_m.plot() warp_m.predict(X)
warp_m.predict_in_warped_space = True warp_m.predict_quantiles(X)
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
class GradientTests(np.testing.TestCase): class GradientTests(np.testing.TestCase):

View file

@ -4,6 +4,7 @@
import numpy as np import numpy as np
from ..core.parameterization import Parameterized, Param from ..core.parameterization import Parameterized, Param
from paramz.transformations import Logexp from paramz.transformations import Logexp
import sys
class WarpingFunction(Parameterized): class WarpingFunction(Parameterized):
@ -14,6 +15,7 @@ class WarpingFunction(Parameterized):
def __init__(self, name): def __init__(self, name):
super(WarpingFunction, self).__init__(name=name) super(WarpingFunction, self).__init__(name=name)
self.rate = 0.1
def f(self, y, psi): def f(self, y, psi):
"""function transformation """function transformation
@ -29,15 +31,35 @@ class WarpingFunction(Parameterized):
"""gradient of f w.r.t to y""" """gradient of f w.r.t to y"""
raise NotImplementedError raise NotImplementedError
def f_inv(self, z, psi): def f_inv(self, z, max_iterations=100, y=None):
"""inverse function transformation""" """
raise NotImplementedError Calculate the numerical inverse of f. This should be
overwritten for specific warping functions where the
inverse can be found in closed form.
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
y = np.ones_like(z)
it = 0
update = np.inf
while np.abs(update).sum() > 1e-10 and it < max_iterations:
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z) / fgrady
y -= self.rate * update
it += 1
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
print("Sum of roots: %.4f" % np.sum(fy - z))
return y
def _get_param_names(self): def _get_param_names(self):
raise NotImplementedError raise NotImplementedError
def plot(self, xmin, xmax): def plot(self, xmin, xmax):
psi = self.psi
y = np.arange(xmin, xmax, 0.01) y = np.arange(xmin, xmax, 0.01)
f_y = self.f(y) f_y = self.f(y)
from matplotlib import pyplot as plt from matplotlib import pyplot as plt
@ -49,183 +71,59 @@ class WarpingFunction(Parameterized):
plt.show() plt.show()
class TanhWarpingFunction(WarpingFunction): class TanhFunction(WarpingFunction):
"""
def __init__(self, n_terms=3): This is the function proposed in Snelson et al.:
"""n_terms specifies the number of tanh terms to be used""" A sum of tanh functions with linear trends outside
self.n_terms = n_terms the range. Notice the term 'd', which scales the
self.num_parameters = 3 * self.n_terms linear trend.
super(TanhWarpingFunction, self).__init__(name='warp_tanh') """
def __init__(self, n_terms=3, initial_y=None):
def f(self, y, psi):
""" """
transform y with f using parameter vector psi n_terms specifies the number of tanh terms to be used
psi = [[a,b,c]]
::math::`f = \\sum_{terms} a * tanh(b*(y+c))`
""" """
#1. check that number of params is consistent
assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
assert psi.shape[1] == 3, 'inconsistent parameter dimensions'
#2. exponentiate the a and b (positive!)
mpsi = psi.copy()
#3. transform data
z = y.copy()
for i in range(len(mpsi)):
a,b,c = mpsi[i]
z += a*np.tanh(b*(y+c))
return z
def f_inv(self, y, psi, iterations=10):
"""
calculate the numerical inverse of f
:param iterations: number of N.R. iterations
"""
y = y.copy()
z = np.ones_like(y)
for i in range(iterations):
z -= (self.f(z, psi) - y)/self.fgrad_y(z,psi)
return z
def fgrad_y(self, y, psi, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
returns: Nx1 vector of derivatives, unless return_precalc is true,
then it also returns the precomputed stuff
"""
mpsi = psi.copy()
# vectorized version
# S = (mpsi[:,1]*(y + mpsi[:,2])).T
S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S)
D = 1-R**2
# GRAD = (1+(mpsi[:,0:1]*mpsi[:,1:2]*D).sum(axis=0))[:,np.newaxis]
GRAD = (1+(mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T
if return_precalc:
# return GRAD,S.sum(axis=1),R.sum(axis=1),D.sum(axis=1)
return GRAD, S, R, D
return GRAD
def fgrad_y_psi(self, y, psi, return_covar_chain=False):
"""
gradient of f w.r.t to y and psi
returns: NxIx3 tensor of partial derivatives
"""
# 1. exponentiate the a and b (positive!)
mpsi = psi.copy()
w, s, r, d = self.fgrad_y(y, psi, return_precalc = True)
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T
if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
covar_grad_chain[:, :, i, 0] = (r[i]).T
covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T)
covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T
return gradients, covar_grad_chain
return gradients
def _get_param_names(self):
variables = ['a', 'b', 'c']
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[])
return names
class TanhWarpingFunction_d(WarpingFunction):
def __init__(self, n_terms=3):
"""n_terms specifies the number of tanh terms to be used"""
self.n_terms = n_terms self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms + 1 self.num_parameters = 3 * self.n_terms + 1
self.psi = np.ones((self.n_terms, 3)) self.psi = np.ones((self.n_terms, 3))
super(TanhFunction, self).__init__(name='warp_tanh')
super(TanhWarpingFunction_d, self).__init__(name='warp_tanh')
self.psi = Param('psi', self.psi) self.psi = Param('psi', self.psi)
self.psi[:, :2].constrain_positive() self.psi[:, :2].constrain_positive()
self.d = Param('%s' % ('d'), 1.0, Logexp()) self.d = Param('%s' % ('d'), 1.0, Logexp())
self.link_parameter(self.psi) self.link_parameter(self.psi)
self.link_parameter(self.d) self.link_parameter(self.d)
self.initial_y = initial_y
def f(self, y): def f(self, y):
""" """
Transform y with f using parameter vector psi Transform y with f using parameter vector psi
psi = [[a,b,c]] psi = [[a,b,c]]
:math:`f = \\sum_{terms} a * tanh(b*(y+c))` :math:`f = (y * d) + \\sum_{terms} a * tanh(b *(y + c))`
""" """
#1. check that number of params is consistent
# assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
# assert psi.shape[1] == 4, 'inconsistent parameter dimensions'
d = self.d d = self.d
mpsi = self.psi mpsi = self.psi
z = d * y.copy()
#3. transform data
z = d*y.copy()
for i in range(len(mpsi)): for i in range(len(mpsi)):
a,b,c = mpsi[i] a, b, c = mpsi[i]
z += a*np.tanh(b*(y+c)) z += a * np.tanh(b * (y + c))
return z return z
def f_inv(self, z, max_iterations=1000, y=None):
"""
calculate the numerical inverse of f
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
if y is None:
y = np.ones_like(z)
it = 0
update = np.inf
while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations):
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z)/fgrady
y -= update
it += 1
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
return y
def fgrad_y(self, y, return_precalc=False): def fgrad_y(self, y, return_precalc=False):
""" """
gradient of f w.r.t to y ([N x 1]) gradient of f w.r.t to y ([N x 1])
:returns: Nx1 vector of derivatives, unless return_precalc is true, then it also returns the precomputed stuff :returns: Nx1 vector of derivatives, unless return_precalc is true,
then it also returns the precomputed stuff
""" """
d = self.d d = self.d
mpsi = self.psi mpsi = self.psi
# vectorized version # vectorized version
S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T S = (mpsi[:,1] * (y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S) R = np.tanh(S)
D = 1-R**2 D = 1 - (R ** 2)
GRAD = (d + (mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T GRAD = (d + (mpsi[:,0:1][:,:,None] * mpsi[:,1:2][:,:,None] * D).sum(axis=0)).T
if return_precalc: if return_precalc:
return GRAD, S, R, D return GRAD, S, R, D
@ -244,45 +142,86 @@ class TanhWarpingFunction_d(WarpingFunction):
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4)) gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)): for i in range(len(mpsi)):
a,b,c = mpsi[i] a,b,c = mpsi[i]
gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T gradients[:, :, i, 0] = (b * (1.0/np.cosh(s[i])) ** 2).T
gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T gradients[:, :, i, 1] = a * (d[i] - 2.0 * s[i] * r[i] * (1.0/np.cosh(s[i])) ** 2).T
gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T gradients[:, :, i, 2] = (-2.0 * a * (b ** 2) * r[i] * ((1.0 / np.cosh(s[i])) ** 2)).T
gradients[:,:,0,3] = 1.0 gradients[:, :, 0, 3] = 1.0
if return_covar_chain: if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4)) covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)): for i in range(len(mpsi)):
a,b,c = mpsi[i] a,b,c = mpsi[i]
covar_grad_chain[:, :, i, 0] = (r[i]).T covar_grad_chain[:, :, i, 0] = (r[i]).T
covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T) covar_grad_chain[:, :, i, 1] = (a * (y + c) * ((1.0 / np.cosh(s[i])) ** 2).T)
covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T covar_grad_chain[:, :, i, 2] = a * b * ((1.0 / np.cosh(s[i])) ** 2).T
covar_grad_chain[:, :, 0, 3] = y covar_grad_chain[:, :, 0, 3] = y
return gradients, covar_grad_chain return gradients, covar_grad_chain
return gradients return gradients
def _get_param_names(self): def _get_param_names(self):
variables = ['a', 'b', 'c', 'd'] variables = ['a', 'b', 'c', 'd']
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[]) names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)]
names.append('warp_tanh_d') for q in range(self.n_terms)],[])
names.append('warp_tanh')
return names return names
def update_grads(self, Y_untransformed, Kiy):
grad_y = self.fgrad_y(Y_untransformed)
grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed,
return_covar_chain=True)
djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0)
dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0)
warping_grads = -dquad_dpsi + djac_dpsi
self.psi.gradient[:] = warping_grads[:, :-1]
self.d.gradient[:] = warping_grads[0, -1]
class LogFunction(WarpingFunction):
"""
Easy wrapper for applying a fixed log warping function to
positive-only values.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
"""
def __init__(self, closed_inverse=True):
self.num_parameters = 0
super(LogFunction, self).__init__(name='log')
if closed_inverse:
self.f_inv = self._f_inv
def f(self, y):
return np.log(y)
def fgrad_y(self, y):
return 1. / y
def update_grads(self, Y_untransformed, Kiy):
pass
def fgrad_y_psi(self, y, return_covar_chain=False):
if return_covar_chain:
return 0, 0
return 0
def _f_inv(self, z, y=None):
return np.exp(z)
class IdentityFunction(WarpingFunction): class IdentityFunction(WarpingFunction):
""" """
Identity warping function. This is for testing and sanity check purposes Identity warping function. This is for testing and sanity check purposes
and should not be used in practice. and should not be used in practice.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
""" """
def __init__(self): def __init__(self, closed_inverse=True):
self.num_parameters = 4 self.num_parameters = 0
self.psi = Param('psi', np.zeros((1,3)))
self.d = Param('%s' % ('d'), 1.0, Logexp())
super(IdentityFunction, self).__init__(name='identity') super(IdentityFunction, self).__init__(name='identity')
self.link_parameter(self.psi) if closed_inverse:
self.link_parameter(self.d) self.f_inv = self._f_inv
def f(self, y): def f(self, y):
return y return y
@ -290,11 +229,14 @@ class IdentityFunction(WarpingFunction):
def fgrad_y(self, y): def fgrad_y(self, y):
return np.ones(y.shape) return np.ones(y.shape)
def update_grads(self, Y_untransformed, Kiy):
pass
def fgrad_y_psi(self, y, return_covar_chain=False): def fgrad_y_psi(self, y, return_covar_chain=False):
gradients = np.zeros((y.shape[0], y.shape[1], len(self.psi), 4))
if return_covar_chain: if return_covar_chain:
return gradients, gradients return 0, 0
return gradients return 0
def f_inv(self, z, y=None): def _f_inv(self, z, y=None):
return z return z