added a new test which tries to replicate Snelson's toy 1D but NR seems to diverge...

This commit is contained in:
beckdaniel 2015-08-05 19:05:48 +01:00
parent 76bc0bec25
commit 995de0f399
4 changed files with 88 additions and 25 deletions

View file

@ -69,27 +69,27 @@ class WarpedGP(GP):
def plot_warping(self):
self.warping_function.plot(self.Y_untransformed.min(), self.Y_untransformed.max())
def _get_warped_term(self, mean, var, gh_samples, pred_init=None):
arg1 = gh_samples.dot(var.T) * np.sqrt(2)
def _get_warped_term(self, mean, std, gh_samples, pred_init=None):
arg1 = gh_samples.dot(std.T) * np.sqrt(2)
arg2 = np.ones(shape=gh_samples.shape).dot(mean.T)
return self.warping_function.f_inv(arg1 + arg2, y=pred_init)
def _get_warped_mean(self, mean, var, pred_init=None, deg_gauss_hermite=100):
def _get_warped_mean(self, mean, std, pred_init=None, deg_gauss_hermite=100):
"""
Calculate the warped mean by using Gauss-Hermite quadrature.
"""
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
return gh_weights.dot(self._get_warped_term(mean, var, gh_samples)) / np.sqrt(np.pi)
return gh_weights.dot(self._get_warped_term(mean, std, gh_samples)) / np.sqrt(np.pi)
def _get_warped_variance(self, mean, var, pred_init=None, deg_gauss_hermite=100):
def _get_warped_variance(self, mean, std, pred_init=None, deg_gauss_hermite=100):
gh_samples, gh_weights = np.polynomial.hermite.hermgauss(deg_gauss_hermite)
gh_samples = gh_samples[:,None]
gh_weights = gh_weights[None,:]
arg1 = gh_weights.dot(self._get_warped_term(mean, var, gh_samples,
arg1 = gh_weights.dot(self._get_warped_term(mean, std, gh_samples,
pred_init=pred_init) ** 2) / np.sqrt(np.pi)
arg2 = self._get_warped_mean(mean, var, pred_init=pred_init,
arg2 = self._get_warped_mean(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite)
return arg1 - (arg2 ** 2)
@ -103,20 +103,20 @@ class WarpedGP(GP):
mean, var = self.likelihood.predictive_values(mu, var)
if self.predict_in_warped_space:
std = np.sqrt(var)
if median:
#print 'MEDIAN!'
wmean = self.warping_function.f_inv(mean, y=pred_init)
wmean = self.warping_function.f_inv(mean, y=pred_init)
else:
#print 'MEAN!'
wmean = self._get_warped_mean(mean, var, pred_init=pred_init,
wmean = self._get_warped_mean(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite).T
#var = self.warping_function.f_inv(var)
wvar = self._get_warped_variance(mean, var, pred_init=pred_init,
wvar = self._get_warped_variance(mean, std, pred_init=pred_init,
deg_gauss_hermite=deg_gauss_hermite).T
else:
wmean = mean
#wvar = var
wvar = self.warping_function.f_inv(var)
wvar = var
if self.scale_data:
pred = self._unscale_data(pred)
@ -138,9 +138,12 @@ class WarpedGP(GP):
if self.normalizer is not None:
m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v)
a, b = self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)
if not self.predict_in_warped_space:
return [a, b]
#print a.shape
new_a = self.warping_function.f_inv(a)
new_b = self.warping_function.f_inv(b)
return [new_a, new_b]
#return self.likelihood.predictive_quantiles(m, v, quantiles, Y_metadata)

View file

@ -75,7 +75,7 @@ def plot_fit(model, plot_limits=None, which_data_rows='all',
X = model.X
Y = model.Y
if isinstance(model, WarpedGP):
if isinstance(model, WarpedGP) and model.predict_in_warped_space:
Y = model.Y_untransformed
if sparse.issparse(Y): Y = Y.todense().view(np.ndarray)
@ -117,7 +117,11 @@ def plot_fit(model, plot_limits=None, which_data_rows='all',
Y_metadata = {'output_index': extra_data}
else:
Y_metadata['output_index'] = extra_data
m, v = model.predict(Xgrid, full_cov=False, Y_metadata=Y_metadata, **predict_kw)
if isinstance(model, WarpedGP):
m, v = model.predict(Xgrid, full_cov=False, median=True, Y_metadata=Y_metadata, **predict_kw)
#print np.concatenate((Xgrid, m), axis=1)
else:
m, v = model.predict(Xgrid, full_cov=False, Y_metadata=Y_metadata, **predict_kw)
lower, upper = model.predict_quantiles(Xgrid, Y_metadata=Y_metadata)

View file

@ -203,13 +203,63 @@ class MiscTests(unittest.TestCase):
m.optimize()
print(m)
def test_warped_gp(self):
def test_warped_gp_identity(self):
"""
A WarpedGP with the identity warping function should be
equal to a standard GP.
"""
k = GPy.kern.RBF(1)
warp = GPy.util.warping_functions.IdentityFunction()
m = GPy.models.WarpedGP(self.X, self.Y, kernel=k, warping_function=warp)
m.randomize()
m = GPy.models.GPRegression(self.X, self.Y, kernel=k)
m.optimize()
print(m)
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction()
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds)
@unittest.skip('Comment this to plot the modified sine function')
def test_warped_gp_sine(self):
"""
A test replicating the sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.1,151)
Y = np.exp(Y) - 5
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0
#np.seterr(over='raise')
import matplotlib.pyplot as plt
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f)
#warp_m['.*variance.*'].constrain_fixed(0.25)
#warp_m['.*lengthscale.*'].constrain_fixed(1)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
class GradientTests(np.testing.TestCase):
def setUp(self):

View file

@ -211,17 +211,24 @@ class TanhWarpingFunction_d(WarpingFunction):
z = z.copy()
if y is None:
y = np.ones_like(z)
y = np.ones_like(z) * 0.1
#y = np.zeros_like(z)
it = 0
update = np.inf
#import ipdb; ipdb.set_trace()
while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations):
update = (self.f(y) - z)/self.fgrad_y(y)
fy = self.f(y)
fgrady = self.fgrad_y(y)
update = (fy - z)/fgrady
y -= update
it += 1
#print it
#print y
if it == max_iterations:
print("WARNING!!! Maximum number of iterations reached in f_inv ")
#print np.abs(update)
return y
@ -265,7 +272,7 @@ class TanhWarpingFunction_d(WarpingFunction):
mpsi = self.psi
w, s, r, d = self.fgrad_y(y, return_precalc = True)
#print s
gradients = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4))
for i in range(len(mpsi)):
a,b,c = mpsi[i]
@ -316,11 +323,10 @@ class IdentityFunction(WarpingFunction):
return np.ones(y.shape)
def fgrad_y_psi(self, y, return_covar_chain=False):
gradients = np.ones((y.shape[0], y.shape[1], len(self.psi), 4))
gradients = np.zeros((y.shape[0], y.shape[1], len(self.psi), 4))
if return_covar_chain:
return gradients, gradients
return gradients
def f_inv(self,z):
def f_inv(self, z, y=None):
return z