[beckdaniel] merge in warped gp changes

This commit is contained in:
mzwiessele 2015-10-13 08:52:26 +01:00
commit 87539b1c1b
4 changed files with 183 additions and 57 deletions

View file

@ -23,7 +23,7 @@ class Model(Parameterized):
super(Model, self).__init__(name) # Parameterized.__init__(self)
self.optimization_runs = []
self.sampling_runs = []
self.preferred_optimizer = 'bfgs'
self.preferred_optimizer = 'lbfgsb'
from .parameterization.ties_and_remappings import Tie
self.tie = Tie()
self.link_parameter(self.tie, -1)
@ -104,7 +104,7 @@ class Model(Parameterized):
raise e
if len(self.optimization_runs):
i = np.argmin([o.f_opt for o in self.optimization_runs])
i = np.nanargmin([o.f_opt for o in self.optimization_runs])
self.optimizer_array = self.optimization_runs[i].x_opt
else:
self.optimizer_array = initial_parameters

View file

@ -16,16 +16,16 @@ class WarpedGP(GP):
if warping_function == None:
self.warping_function = TanhWarpingFunction_d(warping_terms)
self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1,) * 1)
self.warping_params = (np.random.randn(self.warping_function.n_terms * 3 + 1) * 1)
else:
self.warping_function = warping_function
self.scale_data = False
if self.scale_data:
Y = self._scale_data(Y)
self.has_uncertain_inputs = False
#self.has_uncertain_inputs = False
self.Y_untransformed = Y.copy()
self.predict_in_warped_space = False
self.predict_in_warped_space = True
likelihood = likelihoods.Gaussian()
GP.__init__(self, X, self.transform_data(), likelihood=likelihood, kernel=kernel)
@ -56,7 +56,6 @@ class WarpedGP(GP):
self.warping_function.psi.gradient[:] = warping_grads[:, :-1]
self.warping_function.d.gradient[:] = warping_grads[0, -1]
def transform_data(self):
Y = self.warping_function.f(self.Y_untransformed.copy()).copy()
return Y
@ -69,7 +68,35 @@ class WarpedGP(GP):
def plot_warping(self):
self.warping_function.plot(self.Y_untransformed.min(), self.Y_untransformed.max())
def predict(self, Xnew, which_parts='all', pred_init=None):
def _get_warped_term(self, mean, std, gh_samples, pred_init=None):
arg1 = gh_samples.dot(std.T) * np.sqrt(2)
arg2 = np.ones(shape=gh_samples.shape).dot(mean.T)
return self.warping_function.f_inv(arg1 + arg2, y=pred_init)
def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=100):
"""
Calculate the warped mean by using Gauss-Hermite quadrature.
"""
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi)
def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=100):
"""
Calculate the warped variance by using Gauss-Hermite quadrature.
"""
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples,
pred_init=pred_init) ** 2) / np.sqrt(np.pi)
arg2 = self._get_warped_mean(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite)
return arg1 - (arg2 ** 2)
def predict(self, Xnew, which_parts='all', pred_init=None, full_cov=False, Y_metadata=None,
median=False, deg_gauss_hermite=100):
# normalize X values
# Xnew = (Xnew.copy() - self._Xoffset) / self._Xscale
mu, var = GP._raw_predict(self, Xnew)
@ -78,13 +105,48 @@ class WarpedGP(GP):
mean, var = self.likelihood.predictive_values(mu, var)
if self.predict_in_warped_space:
mean = self.warping_function.f_inv(mean, y=pred_init)
var = self.warping_function.f_inv(var)
std = np.sqrt(var)
if median:
wmean = self.warping_function.f_inv(mean, y=pred_init)
else:
wmean = self._get_warped_mean(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite).T
wvar = self._get_warped_variance(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite).T
else:
wmean = mean
wvar = var
if self.scale_data:
mean = self._unscale_data(mean)
pred = self._unscale_data(pred)
return wmean, wvar
def predict_quantiles(self, X, quantiles=(2.5, 97.5), Y_metadata=None):
"""
Get the predictive quantiles around the prediction at X
:param X: The points at which to make a prediction
:type X: np.ndarray (Xnew x self.input_dim)
:param quantiles: tuple of quantiles, default is (2.5, 97.5) which is the 95% interval
:type quantiles: tuple
:returns: list of quantiles for each X and predictive quantiles for interval combination
:rtype: [np.ndarray (Xnew x self.input_dim), np.ndarray (Xnew x self.input_dim)]
"""
m, v = self._raw_predict(X, full_cov=False)
if self.normalizer is not None:
m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v)
a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
#return [a, b]
if not self.predict_in_warped_space:
return [a, b]
#print a.shape
new_a = self.warping_function.f_inv(a)
new_b = self.warping_function.f_inv(b)
return [new_a, new_b]
#return self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
return mean, var
if __name__ == '__main__':
X = np.random.randn(100, 1)

View file

@ -289,6 +289,64 @@ class MiscTests(unittest.TestCase):
m.optimize()
print(m)
def test_warped_gp_identity(self):
"""
A WarpedGP with the identity warping function should be
equal to a standard GP.
"""
k = GPy.kern.RBF(1)
m = GPy.models.GPRegression(self.X, self.Y, kernel=k)
m.optimize()
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction()
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds)
@unittest.skip('Comment this to plot the modified sine function')
def test_warped_gp_sine(self):
"""
A test replicating the sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.1,151)
Y = np.exp(Y) - 5
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0
#np.seterr(over='raise')
import matplotlib.pyplot as plt
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f)
#warp_m['.*variance.*'].constrain_fixed(0.25)
#warp_m['.*lengthscale.*'].constrain_fixed(1)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
class GradientTests(np.testing.TestCase):
def setUp(self):
######################################

View file

@ -5,6 +5,7 @@ import numpy as np
from ..core.parameterization import Parameterized, Param
from ..core.parameterization.transformations import Logexp
class WarpingFunction(Parameterized):
"""
abstract function for warping
@ -14,28 +15,28 @@ class WarpingFunction(Parameterized):
def __init__(self, name):
super(WarpingFunction, self).__init__(name=name)
def f(self,y,psi):
def f(self, y, psi):
"""function transformation
y is a list of values (GP training data) of shpape [N,1]
y is a list of values (GP training data) of shape [N, 1]
"""
raise NotImplementedError
def fgrad_y(self,y,psi):
def fgrad_y(self, y, psi):
"""gradient of f w.r.t to y"""
raise NotImplementedError
def fgrad_y_psi(self,y,psi):
def fgrad_y_psi(self, y, psi):
"""gradient of f w.r.t to y"""
raise NotImplementedError
def f_inv(self,z,psi):
def f_inv(self, z, psi):
"""inverse function transformation"""
raise NotImplementedError
def _get_param_names(self):
raise NotImplementedError
def plot(self, xmin, xmax):
def plot(self, xmin, xmax):
psi = self.psi
y = np.arange(xmin, xmax, 0.01)
f_y = self.f(y)
@ -45,23 +46,23 @@ class WarpingFunction(Parameterized):
plt.xlabel('y')
plt.ylabel('f(y)')
plt.title('warping function')
plt.show()
class TanhWarpingFunction(WarpingFunction):
def __init__(self,n_terms=3):
def __init__(self, n_terms=3):
"""n_terms specifies the number of tanh terms to be used"""
self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms
super(TanhWarpingFunction, self).__init__(name='warp_tanh')
def f(self,y,psi):
def f(self, y, psi):
"""
transform y with f using parameter vector psi
psi = [[a,b,c]]
::math::`f = \\sum_{terms} a * tanh(b*(y+c))`
"""
#1. check that number of params is consistent
assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
assert psi.shape[1] == 3, 'inconsistent parameter dimensions'
@ -76,25 +77,19 @@ class TanhWarpingFunction(WarpingFunction):
z += a*np.tanh(b*(y+c))
return z
def f_inv(self, y, psi, iterations = 10):
def f_inv(self, y, psi, iterations=10):
"""
calculate the numerical inverse of f
:param iterations: number of N.R. iterations
"""
y = y.copy()
z = np.ones_like(y)
for i in range(iterations):
z -= (self.f(z, psi) - y)/self.fgrad_y(z,psi)
return z
def fgrad_y(self, y, psi, return_precalc = False):
def fgrad_y(self, y, psi, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
returns: Nx1 vector of derivatives, unless return_precalc is true,
@ -117,16 +112,12 @@ class TanhWarpingFunction(WarpingFunction):
# return GRAD,S.sum(axis=1),R.sum(axis=1),D.sum(axis=1)
return GRAD, S, R, D
return GRAD
def fgrad_y_psi(self, y, psi, return_covar_chain = False):
def fgrad_y_psi(self, y, psi, return_covar_chain=False):
"""
gradient of f w.r.t to y and psi
returns: NxIx3 tensor of partial derivatives
"""
# 1. exponentiate the a and b (positive!)
@ -140,7 +131,6 @@ class TanhWarpingFunction(WarpingFunction):
gradients[:,:,i,1] = a*(d[i] - 2.0*s[i]*r[i]*(1.0/np.cosh(s[i]))**2).T
gradients[:,:,i,2] = (-2.0*a*(b**2)*r[i]*((1.0/np.cosh(s[i]))**2)).T
if return_covar_chain:
covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 3))
@ -162,7 +152,7 @@ class TanhWarpingFunction(WarpingFunction):
class TanhWarpingFunction_d(WarpingFunction):
def __init__(self,n_terms=3):
def __init__(self, n_terms=3):
"""n_terms specifies the number of tanh terms to be used"""
self.n_terms = n_terms
self.num_parameters = 3 * self.n_terms + 1
@ -176,15 +166,13 @@ class TanhWarpingFunction_d(WarpingFunction):
self.link_parameter(self.psi)
self.link_parameter(self.d)
def f(self,y):
def f(self, y):
"""
Transform y with f using parameter vector psi
psi = [[a,b,c]]
:math:`f = \\sum_{terms} a * tanh(b*(y+c))`
"""
#1. check that number of params is consistent
# assert psi.shape[0] == self.n_terms, 'inconsistent parameter dimensions'
# assert psi.shape[1] == 4, 'inconsistent parameter dimensions'
@ -199,13 +187,11 @@ class TanhWarpingFunction_d(WarpingFunction):
z += a*np.tanh(b*(y+c))
return z
def f_inv(self, z, max_iterations=1000, y=None):
"""
calculate the numerical inverse of f
:param max_iterations: maximum number of N.R. iterations
"""
z = z.copy()
@ -216,29 +202,25 @@ class TanhWarpingFunction_d(WarpingFunction):
update = np.inf
while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations):
update = (self.f(y) - z)/self.fgrad_y(y)
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z)/fgrady
y -= update
it += 1
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
return y
def fgrad_y(self, y,return_precalc = False):
def fgrad_y(self, y, return_precalc=False):
"""
gradient of f w.r.t to y ([N x 1])
:returns: Nx1 vector of derivatives, unless return_precalc is true, then it also returns the precomputed stuff
"""
d = self.d
mpsi = self.psi
# vectorized version
S = (mpsi[:,1]*(y[:,:,None] + mpsi[:,2])).T
R = np.tanh(S)
D = 1-R**2
@ -248,23 +230,17 @@ class TanhWarpingFunction_d(WarpingFunction):
if return_precalc:
return GRAD, S, R, D
return GRAD
def fgrad_y_psi(self, y, return_covar_chain = False):
def fgrad_y_psi(self, y, return_covar_chain=False):
"""
gradient of f w.r.t to y and psi
:returns: NxIx4 tensor of partial derivatives
"""
mpsi = self.psi
w, s, r, d = self.fgrad_y(y, return_precalc = True)
w, s, r, d = self.fgrad_y(y, return_precalc=True)
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
@ -292,3 +268,33 @@ class TanhWarpingFunction_d(WarpingFunction):
names = sum([['warp_tanh_%s_t%i' % (variables[n],q) for n in range(3)] for q in range(self.n_terms)],[])
names.append('warp_tanh_d')
return names
class IdentityFunction(WarpingFunction):
"""
Identity warping function. This is for testing and sanity check purposes
and should not be used in practice.
"""
def __init__(self):
self.num_parameters = 4
self.psi = Param('psi', np.zeros((1,3)))
self.d = Param('%s' % ('d'), 1.0, Logexp())
super(IdentityFunction, self).__init__(name='identity')
self.link_parameter(self.psi)
self.link_parameter(self.d)
def f(self, y):
return y
def fgrad_y(self, y):
return np.ones(y.shape)
def fgrad_y_psi(self, y, return_covar_chain=False):
gradients = np.zeros((y.shape[0], y.shape[1], len(self.psi), 4))
if return_covar_chain:
return gradients, gradients
return gradients
def f_inv(self, z, y=None):
return z