Copied over LOO code which is needed for clustering, and hasn't yet been merged into 'devel'. Be careful when merging this branch later.

This commit is contained in:
Michael T Smith 2016-11-18 11:12:49 +00:00
parent 269566befc
commit f2f12787a7
4 changed files with 140 additions and 213 deletions

View file

@ -2,17 +2,17 @@
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from .model import Model
from .parameterization.variational import VariationalPosterior
from .. import kern
from GPy.core.model import Model
from paramz import ObsAr
from .mapping import Mapping
from .. import likelihoods
from .. import kern
from ..inference.latent_function_inference import exact_gaussian_inference, expectation_propagation
from ..util.normalizer import Standardize
from paramz import ObsAr
from GPy.core.parameterization.variational import VariationalPosterior
import logging
import warnings
from GPy.util.normalizer import MeanNorm
logger = logging.getLogger("GP")
class GP(Model):
@ -28,7 +28,7 @@ class GP(Model):
:param Norm normalizer:
normalize the outputs Y.
Prediction will be un-normalized using this normalizer.
If normalizer is None, we will normalize using Standardize.
If normalizer is None, we will normalize using MeanNorm.
If normalizer is False, no normalization will be done.
.. Note:: Multiple independent outputs are allowed using columns of Y
@ -49,7 +49,7 @@ class GP(Model):
logger.info("initializing Y")
if normalizer is True:
self.normalizer = Standardize()
self.normalizer = MeanNorm()
elif normalizer is False:
self.normalizer = None
else:
@ -64,7 +64,6 @@ class GP(Model):
self.Y_normalized = self.Y
else:
self.Y = Y
self.Y_normalized = self.Y
if Y.shape[0] != self.num_data:
#There can be cases where we want inputs than outputs, for example if we have multiple latent
@ -249,16 +248,14 @@ class GP(Model):
"""
#predict the latent function values
mu, var = self._raw_predict(Xnew, full_cov=full_cov, kern=kern)
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
if include_likelihood:
# now push through likelihood
if likelihood is None:
likelihood = self.likelihood
mu, var = likelihood.predictive_values(mu, var, full_cov, Y_metadata=Y_metadata)
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
return mu, var
def predict_noiseless(self, Xnew, full_cov=False, Y_metadata=None, kern=None):
@ -303,14 +300,11 @@ class GP(Model):
:rtype: [np.ndarray (Xnew x self.output_dim), np.ndarray (Xnew x self.output_dim)]
"""
m, v = self._raw_predict(X, full_cov=False, kern=kern)
if self.normalizer is not None:
m, v = self.normalizer.inverse_mean(m), self.normalizer.inverse_variance(v)
if likelihood is None:
likelihood = self.likelihood
quantiles = likelihood.predictive_quantiles(m, v, quantiles, Y_metadata=Y_metadata)
if self.normalizer is not None:
quantiles = [self.normalizer.inverse_mean(q) for q in quantiles]
return quantiles
return likelihood.predictive_quantiles(m, v, quantiles, Y_metadata=Y_metadata)
def predictive_gradients(self, Xnew, kern=None):
"""
@ -343,7 +337,8 @@ class GP(Model):
dv_dX += kern.gradients_X(alpha, Xnew, self._predictive_variable)
return mean_jac, dv_dX
def predict_jacobian(self, Xnew, kern=None, full_cov=False):
def predict_jacobian(self, Xnew, kern=None, full_cov=True):
"""
Compute the derivatives of the posterior of the GP.
@ -361,11 +356,15 @@ class GP(Model):
:param X: The points at which to get the predictive gradients.
:type X: np.ndarray (Xnew x self.input_dim)
:param kern: The kernel to compute the jacobian for.
:param boolean full_cov: whether to return the cross-covariance terms between
the N* Jacobian vectors
:param boolean full_cov: whether to return the full covariance of the jacobian.
:returns: dmu_dX, dv_dX
:rtype: [np.ndarray (N*, Q ,D), np.ndarray (N*,Q,(D)) ]
Note: We always return sum in input_dim gradients, as the off-diagonals
in the input_dim are not needed for further calculations.
This is a compromise for increase in speed. Mathematically the jacobian would
have another dimension in Q.
"""
if kern is None:
kern = self.kern
@ -384,26 +383,24 @@ class GP(Model):
dK2_dXdX = kern.gradients_XX(one, Xnew)
else:
dK2_dXdX = kern.gradients_XX_diag(one, Xnew)
#dK2_dXdX = np.zeros((Xnew.shape[0], Xnew.shape[1], Xnew.shape[1]))
#for i in range(Xnew.shape[0]):
# dK2_dXdX[i:i+1,:,:] = kern.gradients_XX(one, Xnew[i:i+1,:])
def compute_cov_inner(wi):
if full_cov:
var_jac = dK2_dXdX - np.einsum('qnm,msr->nsqr', dK_dXnew_full.T.dot(wi), dK_dXnew_full) # n,s = Xnew.shape[0], m = pred_var.shape[0]
# full covariance gradients:
var_jac = dK2_dXdX - np.einsum('qnm,miq->niq', dK_dXnew_full.T.dot(wi), dK_dXnew_full)
else:
var_jac = dK2_dXdX - np.einsum('qnm,mnr->nqr', dK_dXnew_full.T.dot(wi), dK_dXnew_full)
var_jac = dK2_dXdX - np.einsum('qim,miq->iq', dK_dXnew_full.T.dot(wi), dK_dXnew_full)
return var_jac
if self.posterior.woodbury_inv.ndim == 3: # Missing data:
if full_cov:
var_jac = np.empty((Xnew.shape[0],Xnew.shape[0],Xnew.shape[1],Xnew.shape[1],self.output_dim))
for d in range(self.posterior.woodbury_inv.shape[2]):
var_jac[:, :, :, :, d] = compute_cov_inner(self.posterior.woodbury_inv[:, :, d])
else:
var_jac = np.empty((Xnew.shape[0],Xnew.shape[1],Xnew.shape[1],self.output_dim))
var_jac = np.empty((Xnew.shape[0],Xnew.shape[0],Xnew.shape[1],self.output_dim))
for d in range(self.posterior.woodbury_inv.shape[2]):
var_jac[:, :, :, d] = compute_cov_inner(self.posterior.woodbury_inv[:, :, d])
else:
var_jac = np.empty((Xnew.shape[0],Xnew.shape[1],self.output_dim))
for d in range(self.posterior.woodbury_inv.shape[2]):
var_jac[:, :, d] = compute_cov_inner(self.posterior.woodbury_inv[:, :, d])
else:
var_jac = compute_cov_inner(self.posterior.woodbury_inv)
return mean_jac, var_jac
@ -427,11 +424,10 @@ class GP(Model):
mu_jac, var_jac = self.predict_jacobian(Xnew, kern, full_cov=False)
mumuT = np.einsum('iqd,ipd->iqp', mu_jac, mu_jac)
Sigma = np.zeros(mumuT.shape)
if var_jac.ndim == 4: # Missing data
Sigma = var_jac.sum(-1)
if var_jac.ndim == 3:
Sigma[(slice(None), )+np.diag_indices(Xnew.shape[1], 2)] = var_jac.sum(-1)
else:
Sigma = self.output_dim*var_jac
Sigma[(slice(None), )+np.diag_indices(Xnew.shape[1], 2)] = self.output_dim*var_jac
G = 0.
if mean:
G += mumuT
@ -455,7 +451,7 @@ class GP(Model):
:param bool covariance: whether to include the covariance of the wishart embedding.
:param array-like dimensions: which dimensions of the input space to use [defaults to self.get_most_significant_input_dimensions()[:2]]
"""
G = self.predict_wishart_embedding(Xnew, kern, mean, covariance)
G = self.predict_wishard_embedding(Xnew, kern, mean, covariance)
if dimensions is None:
dimensions = self.get_most_significant_input_dimensions()[:2]
G = G[:, dimensions][:,:,dimensions]
@ -608,3 +604,10 @@ class GP(Model):
mu_star, var_star = self._raw_predict(x_test)
return self.likelihood.log_predictive_density_sampling(y_test, mu_star, var_star, Y_metadata=Y_metadata, num_samples=num_samples)
def LOO(self):
"""
Evaluate the approximate leave one out error using the current state of the inference method
"""
return self.inference_method.LOO(self.kern, self.X, self.Y, self.likelihood, self.posterior, Y_metadata=self.Y_metadata)

View file

@ -41,6 +41,13 @@ class LatentFunctionInference(object):
"""
pass
def LOO(self, kern, X, Y, likelihood, posterior, *args, **kwargs):
"""
Leave one out error for the inference algorithm
"""
raise NotImplementedError
class InferenceMethodList(LatentFunctionInference, list):
def on_optimization_start(self):

48
GPy/testing/loo_tests.py Normal file
View file

@ -0,0 +1,48 @@
import numpy as np
from scipy.stats import norm
import unittest
import GPy
class LOOTest(unittest.TestCase):
def test_LOO(self):
"""
Tests if the log likelihoods for each element of the LOO x-validation
from the LOO method match the log likelihoods.
"""
#simple model
X = np.arange(0,10,1)[:,None]
Y = np.sin(X/5.0)+np.random.randn(X.shape[0],X.shape[1])*0.1
k = GPy.kern.RBF(1)
m = GPy.models.GPRegression(X,Y,k)
m.optimize()
#variables to store the predictions of the GP model.
preds = []
variances = []
for it in range(len(X)):
#we create a new dataset with all but one element left out
Xloo = np.delete(X,it,0)
Yloo = np.delete(Y,it,0)
#the location we have left out (and that we want to predict)
predX = X[it].copy()
#configure the model using the left-one-out data
k2 = GPy.kern.RBF(1)
m2 = GPy.models.GPRegression(Xloo,Yloo,k2)
#copy the hyperparameters over (these don't change between LOOs)
m2.param_array[:]=m.param_array[:].copy()
m2.update_model(True)
#make a prediction for the left-out data
pred, var = m2.predict(np.array(predX[:,None]))
preds.append(pred[0,0])
variances.append(var[0,0])
preds = np.array(preds)[:,None]
variances = np.array(variances)[:,None]
expected = np.log(norm.pdf(preds,Y,np.sqrt(variances)))
actual = m.LOO()
for e,a in zip(expected,actual):
assert np.isclose(e,a,atol=0.0001,rtol=0.0001), "LOO likelihoods don't match expected values"

View file

@ -91,33 +91,19 @@ class MiscTests(unittest.TestCase):
k = GPy.kern.RBF(1)
m2 = GPy.models.GPRegression(self.X, (Y-mu)/std, kernel=k, normalizer=False)
m2[:] = m[:]
mu1, var1 = m.predict(m.X, full_cov=True)
mu2, var2 = m2.predict(m2.X, full_cov=True)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2*std**2)
np.testing.assert_allclose(var1, var2)
mu1, var1 = m.predict(m.X, full_cov=False)
mu2, var2 = m2.predict(m2.X, full_cov=False)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2*std**2)
np.testing.assert_allclose(var1, var2)
q50n = m.predict_quantiles(m.X, (50,))
q50 = m2.predict_quantiles(m2.X, (50,))
np.testing.assert_allclose(q50n[0], (q50[0]*std)+mu)
# Test variance component:
qs = np.array([2.5, 97.5])
# The quantiles get computed before unormalization
# And transformed using the mean transformation:
c = np.random.choice(self.X.shape[0])
q95 = m2.predict_quantiles(self.X[[c]], qs)
mu, var = m2.predict(self.X[[c]])
from scipy.stats import norm
np.testing.assert_allclose((mu+(norm.ppf(qs/100.)*np.sqrt(var))).flatten(), np.array(q95).flatten())
def check_jacobian(self):
try:
import autograd.numpy as np, autograd as ag, GPy, matplotlib.pyplot as plt
@ -181,8 +167,8 @@ class MiscTests(unittest.TestCase):
Y_mu_true = 2*X_pred_mu
Y_var_true = 4*X_pred_var
Y_mu_pred, Y_var_pred = m.predict_noiseless(X_pred)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-3)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-3)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-4)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-4)
def test_sparse_raw_predict(self):
k = GPy.kern.RBF(1)
@ -328,41 +314,6 @@ class MiscTests(unittest.TestCase):
m.checkgrad()
print(m)
def test_mrd(self):
from GPy.inference.latent_function_inference import InferenceMethodList, VarDTC
from GPy.likelihoods import Gaussian
Y1 = np.random.normal(0, 1, (40, 13))
Y2 = np.random.normal(0, 1, (40, 6))
Y3 = np.random.normal(0, 1, (40, 8))
Q = 5
m = GPy.models.MRD(dict(data1=Y1, data2=Y2, data3=Y3), Q,
)
m.randomize()
self.assertTrue(m.checkgrad())
m = GPy.models.MRD(dict(data1=Y1, data2=Y2, data3=Y3), Q, initx='PCA_single',
initz='random',
kernel=[GPy.kern.RBF(Q, ARD=1) for _ in range(3)],
inference_method=InferenceMethodList([VarDTC() for _ in range(3)]),
likelihoods = [Gaussian(name='Gaussian_noise'.format(i)) for i in range(3)])
m.randomize()
self.assertTrue(m.checkgrad())
m = GPy.models.MRD(dict(data1=Y1, data2=Y2, data3=Y3), Q, initx='random',
initz='random',
kernel=GPy.kern.RBF(Q, ARD=1),
)
m.randomize()
self.assertTrue(m.checkgrad())
m = GPy.models.MRD(dict(data1=Y1, data2=Y2, data3=Y3), Q, X=np.random.normal(0,1,size=(40,Q)),
X_variance=False,
kernel=GPy.kern.RBF(Q, ARD=1),
likelihoods = [Gaussian(name='Gaussian_noise'.format(i)) for i in range(3)])
m.randomize()
self.assertTrue(m.checkgrad())
def test_model_set_params(self):
m = GPy.models.GPRegression(self.X, self.Y)
lengthscale = np.random.uniform()
@ -410,143 +361,61 @@ class MiscTests(unittest.TestCase):
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warping_function=warp_f)
warp_f = GPy.util.warping_functions.IdentityFunction()
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.IdentityFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize()
warp_preds_exact = warp_m_exact.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4)
def test_warped_gp_log(self):
@unittest.skip('Comment this to plot the modified sine function')
def test_warped_gp_sine(self):
"""
A WarpedGP with the log warping function should be
equal to a standard GP with log labels.
Note that we predict the median here.
"""
k = GPy.kern.RBF(1)
Y = np.abs(self.Y)
logY = np.log(Y)
m = GPy.models.GPRegression(self.X, logY, kernel=k)
m.optimize()
preds = m.predict(self.X)[0]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X, median=True)[0]
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.LogFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize(messages=True)
warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0]
np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4)
np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4)
def test_warped_gp_cubic_sine(self, max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper. This test doesn't have any assertions, it's
just to ensure coverage of the tanh warping function code.
A test replicating the sine regression problem from
Snelson's paper.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
X = X[:, None]
Y = Y[:, None]
Y = np.sin(X) + np.random.normal(0,0.1,151)
Y = np.exp(Y) - 5
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0
warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
max_iters=max_iters)
warp_m.predict(X)
warp_m.predict_quantiles(X)
warp_m.log_predictive_density(X, Y)
#np.seterr(over='raise')
import matplotlib.pyplot as plt
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f)
#warp_m['.*variance.*'].constrain_fixed(0.25)
#warp_m['.*lengthscale.*'].constrain_fixed(1)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m.predict_in_warped_space = False
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
def test_offset_regression(self):
#Tests GPy.models.GPOffsetRegression. Using two small time series
#from a sine wave, we confirm the algorithm determines that the
#likelihood is maximised when the offset hyperparameter is approximately
#equal to the actual offset in X between the two time series.
offset = 3
X1 = np.arange(0,50,5.0)[:,None]
X2 = np.arange(0+offset,50+offset,5.0)[:,None]
X = np.vstack([X1,X2])
ind = np.vstack([np.zeros([10,1]),np.ones([10,1])])
X = np.hstack([X,ind])
Y = np.sin((X[0:10,0])/30.0)[:,None]
Y = np.vstack([Y,Y])
m = GPy.models.GPOffsetRegression(X,Y)
m.rbf.lengthscale=5.0 #make it something other than one to check our gradients properly!
assert m.checkgrad(), "Gradients of offset parameters don't match numerical approximations."
m.optimize()
assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset))
def test_logistic_basis_func_gradients(self):
X = np.random.uniform(-4, 4, (20, 5))
points = np.random.uniform(X.min(0), X.max(0), X.shape[1])
ks = []
for i in range(points.shape[0]):
if (i%2==0) and (i%3!=0):
self.assertRaises(AssertionError, GPy.kern.LogisticBasisFuncKernel, 1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i])
else:
ks.append(GPy.kern.LogisticBasisFuncKernel(1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i]))
k = GPy.kern.Add(ks)
k.randomize()
Y = np.random.normal(0, 1, (X.shape[0], 1))
m = GPy.models.GPRegression(X, Y, kernel=k.copy())
assert m.checkgrad()
def test_posterior_inf_basis_funcs(self):
X = np.random.uniform(-4, 1, (50, 1))
# Logistic:
k = GPy.kern.LogisticBasisFuncKernel(1, [0, -2])
true_w = [1, 2]
true_slope = [5, -2]
Y = 0
for w, s, c in zip(true_w, true_slope, k.centers[0]):
Y += w/(1+np.exp(-s*(X-c)))
Y += np.random.normal(0, .000001)
m = GPy.models.GPRegression(X,Y,kernel=k.copy())
#m.likelihood.fix(1e-6)
m.optimize()
wu, wv = m.kern.posterior_inf()
#_sort = np.argsort(wu.flat)
#from scipy.stats import norm
#confidence_intervals = np.array(norm.interval(.95, loc=wu.flat[_sort], scale=np.sqrt(np.diag(wv))[_sort])).T
#for i in range(wu.size):
# s,t = confidence_intervals[i]
# v = true_w[i]
# assert ((s<v)&(v<t)), "didnt find true w within the 95% confidence interval of the predicted values"
np.testing.assert_allclose(np.sort(wu.flat), np.sort(true_w), rtol=1e-4)
np.testing.assert_allclose(np.diag(wv), 0, atol=1e-4)
np.testing.assert_allclose(np.sort(m.kern.slope.flat), np.sort(true_slope), rtol=1e-4)
def test_LOO_laplace(self):
import GPy
gauss = GPy.likelihoods.Gaussian()
kern = GPy.kern.RBF(self.X.shape[1]) + GPy.kern.White(self.X.shape[1])
exact = GPy.inference.latent_function_inference.ExactGaussianInference()
m_exact = GPy.core.GP(X=self.X, Y=self.Y, kernel=kern.copy(), likelihood=gauss.copy(), inference_method=exact)
laplace = GPy.inference.latent_function_inference.Laplace()
m_laplace = GPy.core.GP(X=self.X, Y=self.Y, kernel=kern.copy(), likelihood=gauss.copy(), inference_method=laplace)
m_laplace[:] = m_exact[:]
np.testing.assert_almost_equal(m_exact.LOO(), m_laplace.LOO())
class GradientTests(np.testing.TestCase):
def setUp(self):