Merge branch 'devel' of github.com:SheffieldML/GPy into devel

This commit is contained in:
mzwiessele 2016-08-16 13:47:24 +01:00
commit 611cfa3f8b
5 changed files with 259 additions and 239 deletions

View file

@ -550,3 +550,34 @@ def parametric_mean_function(max_iters=100, optimize=True, plot=True):
return m
def warped_gp_cubic_sine(max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
X = X[:, None]
Y = Y[:, None]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhFunction(n_terms=2)
warp_m = GPy.models.WarpedGP(X, Y, kernel=warp_k, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
m = GPy.models.GPRegression(X, Y)
m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters)
warp_m.optimize_restarts(parallel=False, robust=True, num_restarts=5, max_iters=max_iters)
#m.optimize(max_iters=max_iters)
#warp_m.optimize(max_iters=max_iters)
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False
warp_m.plot(title="Warped GP - Latent space")
warp_m.predict_in_warped_space = True
warp_m.plot(title="Warped GP - Warped space")
m.plot(title="Standard GP")
warp_m.plot_warping()
pb.show()

View file

@ -2,24 +2,27 @@
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..util.warping_functions import *
#from ..util.warping_functions import *
from ..core import GP
from .. import likelihoods
from GPy.util.warping_functions import TanhWarpingFunction_d
from paramz import ObsAr
#from GPy.util.warping_functions import TanhFunction
from ..util.warping_functions import TanhFunction
from GPy import kern
class WarpedGP(GP):
"""
This defines a GP Regression model that applies a
warping function to the output.
"""
def __init__(self, X, Y, kernel=None, warping_function=None, warping_terms=3):
if kernel is None:
kernel = kern.RBF(X.shape[1])
if warping_function == None:
self.warping_function = TanhWarpingFunction_d(warping_terms)
self.warping_function = TanhFunction(warping_terms)
self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1) * 1)
else:
self.warping_function = warping_function
self.scale_data = False
if self.scale_data:
Y = self._scale_data(Y)
@ -27,10 +30,13 @@ class WarpedGP(GP):
self.Y_untransformed = Y.copy()
self.predict_in_warped_space = True
likelihood = likelihoods.Gaussian()
GP.__init__(self, X, self.transform_data(), likelihood=likelihood, kernel=kernel)
self.link_parameter(self.warping_function)
def set_XY(self, X=None, Y=None):
self.Y_untransformed = Y.copy()
GP.set_XY(self, X, self.transform_data())
def _scale_data(self, Y):
self._Ymax = Y.max()
self._Ymin = Y.min()
@ -40,27 +46,22 @@ class WarpedGP(GP):
return (Y + 0.5) * (self._Ymax - self._Ymin) + self._Ymin
def parameters_changed(self):
"""
Notice that we update the warping function gradients here.
"""
self.Y[:] = self.transform_data()
super(WarpedGP, self).parameters_changed()
Kiy = self.posterior.woodbury_vector.flatten()
grad_y = self.warping_function.fgrad_y(self.Y_untransformed)
grad_y_psi, grad_psi = self.warping_function.fgrad_y_psi(self.Y_untransformed,
return_covar_chain=True)
djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0)
dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0)
warping_grads = -dquad_dpsi + djac_dpsi
self.warping_function.psi.gradient[:] = warping_grads[:, :-1]
self.warping_function.d.gradient[:] = warping_grads[0, -1]
self.warping_function.update_grads(self.Y_untransformed, Kiy)
def transform_data(self):
Y = self.warping_function.f(self.Y_untransformed.copy()).copy()
return Y
def log_likelihood(self):
"""
Notice we add the jacobian of the warping function here.
"""
ll = GP.log_likelihood(self)
jacobian = self.warping_function.fgrad_y(self.Y_untransformed)
return ll + np.log(jacobian).sum()
@ -73,22 +74,22 @@ class WarpedGP(GP):
arg2 = np.ones(shape=gh_samples.shape).dot(mean.T)
return self.warping_function.f_inv(arg1 + arg2, y=pred_init)
def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=100):
def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=20):
"""
Calculate the warped mean by using Gauss-Hermite quadrature.
"""
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
gh_samples = gh_samples[:, None]
gh_weights = gh_weights[None, :]
return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi)
def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=100):
def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=20):
"""
Calculate the warped variance by using Gauss-Hermite quadrature.
"""
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
gh_samples = gh_samples[:, None]
gh_weights = gh_weights[None, :]
arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples,
pred_init=pred_init) ** 2) / np.sqrt(np.pi)
arg2 = self._get_warped_mean(mean, std, pred_init=pred_init,
@ -96,11 +97,14 @@ class WarpedGP(GP):
return arg1 - (arg2 ** 2)
def predict(self, Xnew, which_parts='all', pred_init=None, full_cov=False, Y_metadata=None,
median=False, deg_gauss_hermite=100):
# normalize X values
# Xnew = (Xnew.copy() - self._Xoffset) / self._Xscale
median=False, deg_gauss_hermite=20, likelihood=None):
"""
Prediction results depend on:
- The value of the self.predict_in_warped_space flag
- The median flag passed as argument
The likelihood keyword is never used, it is just to follow the plotting API.
"""
mu, var = GP._raw_predict(self, Xnew)
# now push through likelihood
mean, var = self.likelihood.predictive_values(mu, var)
@ -116,13 +120,11 @@ class WarpedGP(GP):
else:
wmean = mean
wvar = var
if self.scale_data:
pred = self._unscale_data(pred)
return wmean, wvar
def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None):
def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None, likelihood=None, median=False):
"""
Get the predictive quantiles around the prediction at X
@ -137,15 +139,30 @@ class WarpedGP(GP):
if self.normalizer is not None:
m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v)
a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
#return [a, b]
if not self.predict_in_warped_space:
return [a, b]
#print a.shape
new_a = self.warping_function.f_inv(a)
new_b = self.warping_function.f_inv(b)
return [new_a, new_b]
#return self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
def log_predictive_density(self, x_test, y_test, Y_metadata=None):
"""
Calculation of the log predictive density. Notice we add
the jacobian of the warping function here.
.. math:
p(y_{*}|D) = p(y_{*}|f_{*})p(f_{*}|\mu_{*}\\sigma^{2}_{*})
:param x_test: test locations (x_{*})
:type x_test: (Nx1) array
:param y_test: test observations (y_{*})
:type y_test: (Nx1) array
:param Y_metadata: metadata associated with the test points
"""
mu_star, var_star = self._raw_predict(x_test)
fy = self.warping_function.f(y_test)
ll_lpd = self.likelihood.log_predictive_density(fy, mu_star, var_star, Y_metadata=Y_metadata)
return ll_lpd + np.log(self.warping_function.fgrad_y(y_test))
if __name__ == '__main__':

View file

@ -31,6 +31,7 @@
import numpy as np
from scipy import sparse
import itertools
from ...models import WarpedGP
def in_ipynb():
try:
@ -73,6 +74,9 @@ def helper_predict_with_model(self, Xgrid, plot_raw, apply_link, percentiles, wh
if 'output_index' not in predict_kw['Y_metadata']:
predict_kw['Y_metadata']['output_index'] = Xgrid[:,-1:].astype(np.int)
if isinstance(self, WarpedGP) and self.predict_in_warped_space:
predict_kw['median'] = True
mu, _ = self.predict(Xgrid, **predict_kw)
if percentiles is not None:
@ -295,6 +299,8 @@ def get_x_y_var(model):
Y = model.Y.values
except AttributeError:
Y = model.Y
if isinstance(model, WarpedGP) and model.predict_in_warped_space:
Y = model.Y_untransformed
if sparse.issparse(Y): Y = Y.todense().view(np.ndarray)
return X, X_variance, Y
@ -381,4 +387,4 @@ def x_frame2D(X,plot_limits=None,resolution=None):
resolution = resolution or 50
xx, yy = np.mgrid[xmin[0]:xmax[0]:1j*resolution,xmin[1]:xmax[1]:1j*resolution]
Xnew = np.c_[xx.flat, yy.flat]
return Xnew, xx, yy, xmin, xmax
return Xnew, xx, yy, xmin, xmax

View file

@ -361,50 +361,75 @@ class MiscTests(unittest.TestCase):
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction()
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds)
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.IdentityFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize()
warp_preds_exact = warp_m_exact.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4)
@unittest.skip('Comment this to plot the modified sine function')
def test_warped_gp_sine(self):
def test_warped_gp_log(self):
"""
A test replicating the sine regression problem from
Snelson's paper.
A WarpedGP with the log warping function should be
equal to a standard GP with log labels.
Note that we predict the median here.
"""
k = GPy.kern.RBF(1)
Y = np.abs(self.Y)
logY = np.log(Y)
m = GPy.models.GPRegression(self.X, logY, kernel=k)
m.optimize()
preds = m.predict(self.X)[0]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X, median=True)[0]
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.LogFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize(messages=True)
warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0]
np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4)
np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4)
def test_warped_gp_cubic_sine(self, max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper. This test doesn't have any assertions, it's
just to ensure coverage of the tanh warping function code.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.1,151)
Y = np.exp(Y) - 5
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0
Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
X = X[:, None]
Y = Y[:, None]
#np.seterr(over='raise')
import matplotlib.pyplot as plt
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f)
#warp_m['.*variance.*'].constrain_fixed(0.25)
#warp_m['.*lengthscale.*'].constrain_fixed(1)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
max_iters=max_iters)
warp_m.predict(X)
warp_m.predict_quantiles(X)
warp_m.log_predictive_density(X, Y)
warp_m.predict_in_warped_space = False
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
def test_offset_regression(self):
#Tests GPy.models.GPOffsetRegression. Using two small time series
@ -427,7 +452,6 @@ class MiscTests(unittest.TestCase):
assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset))
class GradientTests(np.testing.TestCase):
def setUp(self):
######################################

View file

@ -4,6 +4,7 @@
import numpy as np
from ..core.parameterization import Parameterized, Param
from paramz.transformations import Logexp
import sys
class WarpingFunction(Parameterized):
@ -14,6 +15,7 @@ class WarpingFunction(Parameterized):
def __init__(self, name):
super(WarpingFunction, self).__init__(name=name)
self.rate = 0.1
def f(self, y, psi):
"""function transformation
@ -29,15 +31,35 @@ class WarpingFunction(Parameterized):
"""gradient of f w.r.t to y"""
raise NotImplementedError
def f_inv(self, z, psi):
"""inverse function transformation"""
raise NotImplementedError
def f_inv(self, z, max_iterations=100, y=None):
"""
Calculate the numerical inverse of f. This should be
overwritten for specific warping functions where the
inverse can be found in closed form.
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
y = np.ones_like(z)
it = 0
update = np.inf
while np.abs(update).sum() > 1e-10 and it < max_iterations:
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z) / fgrady
y -= self.rate * update
it += 1
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
print("Sum of roots: %.4f" % np.sum(fy - z))
return y
def _get_param_names(self):
raise NotImplementedError
def plot(self, xmin, xmax):
psi = self.psi
y = np.arange(xmin, xmax, 0.01)
f_y = self.f(y)
from matplotlib import pyplot as plt
@ -49,183 +71,59 @@ class WarpingFunction(Parameterized):
plt.show()
class TanhWarpingFunction(WarpingFunction):
def __init__(self, n_terms=3):
"""n_terms specifies the number of tanh terms to be used"""
self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms
super(TanhWarpingFunction, self).__init__(name='warp_tanh')
def f(self, y, psi):
class TanhFunction(WarpingFunction):
"""
This is the function proposed in Snelson et al.:
A sum of tanh functions with linear trends outside
the range. Notice the term 'd', which scales the
linear trend.
"""
def __init__(self, n_terms=3, initial_y=None):
"""
transform y with f using parameter vector psi
psi = [[a,b,c]]
::math::`f = \\sum_{terms} a * tanh(b*(y+c))`
n_terms specifies the number of tanh terms to be used
"""
#1. check that number of params is consistent
assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
assert psi.shape[1] == 3, 'inconsistent parameter dimensions'
#2. exponentiate the a and b (positive!)
mpsi = psi.copy()
#3. transform data
z = y.copy()
for i in range(len(mpsi)):
a,b,c = mpsi[i]
z += a*np.tanh(b*(y+c))
return z
def f_inv(self, y, psi, iterations=10):
"""
calculate the numerical inverse of f
:param iterations: number of N.R. iterations
"""
y = y.copy()
z = np.ones_like(y)
for i in range(iterations):
z -= (self.f(z, psi) - y)/self.fgrad_y(z,psi)
return z
def fgrad_y(self, y, psi, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
returns: Nx1 vector of derivatives, unless return_precalc is true,
then it also returns the precomputed stuff
"""
mpsi = psi.copy()
# vectorized version
# S = (mpsi[:,1]*(y + mpsi[:,2])).T
S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S)
D = 1-R**2
# GRAD = (1+(mpsi[:,0:1]*mpsi[:,1:2]*D).sum(axis=0))[:,np.newaxis]
GRAD = (1+(mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T
if return_precalc:
# return GRAD,S.sum(axis=1),R.sum(axis=1),D.sum(axis=1)
return GRAD, S, R, D
return GRAD
def fgrad_y_psi(self, y, psi, return_covar_chain=False):
"""
gradient of f w.r.t to y and psi
returns: NxIx3 tensor of partial derivatives
"""
# 1. exponentiate the a and b (positive!)
mpsi = psi.copy()
w, s, r, d = self.fgrad_y(y, psi, return_precalc = True)
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T
if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
covar_grad_chain[:, :, i, 0] = (r[i]).T
covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T)
covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T
return gradients, covar_grad_chain
return gradients
def _get_param_names(self):
variables = ['a', 'b', 'c']
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[])
return names
class TanhWarpingFunction_d(WarpingFunction):
def __init__(self, n_terms=3):
"""n_terms specifies the number of tanh terms to be used"""
self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms + 1
self.psi = np.ones((self.n_terms, 3))
super(TanhWarpingFunction_d, self).__init__(name='warp_tanh')
super(TanhFunction, self).__init__(name='warp_tanh')
self.psi = Param('psi', self.psi)
self.psi[:, :2].constrain_positive()
self.d = Param('%s' % ('d'), 1.0, Logexp())
self.link_parameter(self.psi)
self.link_parameter(self.d)
self.initial_y = initial_y
def f(self, y):
"""
Transform y with f using parameter vector psi
psi = [[a,b,c]]
:math:`f = \\sum_{terms} a * tanh(b*(y+c))`
:math:`f = (y * d) + \\sum_{terms} a * tanh(b *(y + c))`
"""
#1. check that number of params is consistent
# assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
# assert psi.shape[1] == 4, 'inconsistent parameter dimensions'
d = self.d
mpsi = self.psi
#3. transform data
z = d*y.copy()
z = d * y.copy()
for i in range(len(mpsi)):
a,b,c = mpsi[i]
z += a*np.tanh(b*(y+c))
a, b, c = mpsi[i]
z += a * np.tanh(b * (y + c))
return z
def f_inv(self, z, max_iterations=1000, y=None):
"""
calculate the numerical inverse of f
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
if y is None:
y = np.ones_like(z)
it = 0
update = np.inf
while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations):
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z)/fgrady
y -= update
it += 1
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
return y
def fgrad_y(self, y, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
:returns: Nx1 vector of derivatives, unless return_precalc is true, then it also returns the precomputed stuff
:returns: Nx1 vector of derivatives, unless return_precalc is true,
then it also returns the precomputed stuff
"""
d = self.d
mpsi = self.psi
# vectorized version
S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T
S = (mpsi[:,1] * (y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S)
D = 1-R**2
D = 1 - (R ** 2)
GRAD = (d + (mpsi[:,0:1][:,:,None]*mpsi[:,1:2][:,:,None]*D).sum(axis=0)).T
GRAD = (d + (mpsi[:,0:1][:,:,None] * mpsi[:,1:2][:,:,None] * D).sum(axis=0)).T
if return_precalc:
return GRAD, S, R, D
@ -244,45 +142,86 @@ class TanhWarpingFunction_d(WarpingFunction):
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
gradients[:,:,i,0] = (b*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T
gradients[:,:,0,3] = 1.0
gradients[:, :, i, 0] = (b * (1.0/np.cosh(s[i])) ** 2).T
gradients[:, :, i, 1] = a * (d[i] - 2.0 * s[i] * r[i] * (1.0/np.cosh(s[i])) ** 2).T
gradients[:, :, i, 2] = (-2.0 * a * (b ** 2) * r[i] * ((1.0 / np.cosh(s[i])) ** 2)).T
gradients[:, :, 0, 3] = 1.0
if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
covar_grad_chain[:, :, i, 0] = (r[i]).T
covar_grad_chain[:, :, i, 1] = (a*(y + c) * ((1.0/np.cosh(s[i]))**2).T)
covar_grad_chain[:, :, i, 2] = a*b*((1.0/np.cosh(s[i]))**2).T
covar_grad_chain[:, :, i, 1] = (a * (y + c) * ((1.0 / np.cosh(s[i])) ** 2).T)
covar_grad_chain[:, :, i, 2] = a * b * ((1.0 / np.cosh(s[i])) ** 2).T
covar_grad_chain[:, :, 0, 3] = y
return gradients, covar_grad_chain
return gradients
def _get_param_names(self):
variables = ['a', 'b', 'c', 'd']
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[])
names.append('warp_tanh_d')
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)]
for q in range(self.n_terms)],[])
names.append('warp_tanh')
return names
def update_grads(self, Y_untransformed, Kiy):
grad_y = self.fgrad_y(Y_untransformed)
grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed,
return_covar_chain=True)
djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0)
dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0)
warping_grads = -dquad_dpsi + djac_dpsi
self.psi.gradient[:] = warping_grads[:, :-1]
self.d.gradient[:] = warping_grads[0, -1]
class LogFunction(WarpingFunction):
"""
Easy wrapper for applying a fixed log warping function to
positive-only values.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
"""
def __init__(self, closed_inverse=True):
self.num_parameters = 0
super(LogFunction, self).__init__(name='log')
if closed_inverse:
self.f_inv = self._f_inv
def f(self, y):
return np.log(y)
def fgrad_y(self, y):
return 1. / y
def update_grads(self, Y_untransformed, Kiy):
pass
def fgrad_y_psi(self, y, return_covar_chain=False):
if return_covar_chain:
return 0, 0
return 0
def _f_inv(self, z, y=None):
return np.exp(z)
class IdentityFunction(WarpingFunction):
"""
Identity warping function. This is for testing and sanity check purposes
and should not be used in practice.
The closed_inverse flag should only be set to False for
debugging and testing purposes.
"""
def __init__(self):
self.num_parameters = 4
self.psi = Param('psi', np.zeros((1,3)))
self.d = Param('%s' % ('d'), 1.0, Logexp())
def __init__(self, closed_inverse=True):
self.num_parameters = 0
super(IdentityFunction, self).__init__(name='identity')
self.link_parameter(self.psi)
self.link_parameter(self.d)
if closed_inverse:
self.f_inv = self._f_inv
def f(self, y):
return y
@ -290,11 +229,14 @@ class IdentityFunction(WarpingFunction):
def fgrad_y(self, y):
return np.ones(y.shape)
def update_grads(self, Y_untransformed, Kiy):
pass
def fgrad_y_psi(self, y, return_covar_chain=False):
gradients = np.zeros((y.shape[0], y.shape[1], len(self.psi), 4))
if return_covar_chain:
return gradients, gradients
return gradients
return 0, 0
return 0
def f_inv(self, z, y=None):
def _f_inv(self, z, y=None):
return z