[basisfunc] kernel tests and model tests

This commit is contained in:
Max Zwiessele 2016-09-01 13:09:02 +01:00
parent c3c98652d1
commit ba48c00e9d
5 changed files with 119 additions and 19 deletions

View file

@ -15,6 +15,7 @@ class BasisFuncKernel(Kern):
This class does NOT automatically add an offset to the design matrix phi!
"""
super(BasisFuncKernel, self).__init__(input_dim, active_dims, name)
assert self.input_dim==1, "Basis Function Kernel only implemented for one dimension. Use one kernel per dimension (and add them together) for more dimensions"
self.ARD = ARD
if self.ARD:
phi_test = self._phi(np.random.normal(0, 1, (1, self.input_dim)))
@ -60,6 +61,11 @@ class BasisFuncKernel(Kern):
self.variance.gradient = np.einsum('i,i', dL_dKdiag, self.Kdiag(X)) * self.beta
def concatenate_offset(self, X):
"""
Convenience function to add an offset column to phi.
You can use this function to add an offset (bias on y axis)
to phi in your custom self._phi(X).
"""
return np.c_[np.ones((X.shape[0], 1)), X]
def posterior_inf(self, X=None, posterior=None):
@ -120,6 +126,12 @@ class LinearSlopeBasisFuncKernel(BasisFuncKernel):
return ((phi-(self.stop+self.start)/2.))#/(.5*(self.stop-self.start)))-1.
class ChangePointBasisFuncKernel(BasisFuncKernel):
"""
The basis function has a changepoint. That is, it is constant, jumps at a
single point (given as changepoint) and is constant again. You can
give multiple changepoints. The changepoints are calculated using
np.where(self.X < self.changepoint), -1, 1)
"""
def __init__(self, input_dim, changepoint, variance=1., active_dims=None, ARD=False, name='changepoint'):
self.changepoint = np.array(changepoint)
super(ChangePointBasisFuncKernel, self).__init__(input_dim, variance, active_dims, ARD, name)
@ -129,6 +141,11 @@ class ChangePointBasisFuncKernel(BasisFuncKernel):
return np.where((X < self.changepoint), -1, 1)
class DomainKernel(LinearSlopeBasisFuncKernel):
"""
Create a constant plateou of correlation between start and stop and zero
elsewhere. This is a constant shift of the outputs along the yaxis
in the range from start to stop.
"""
def __init__(self, input_dim, start, stop, variance=1., active_dims=None, ARD=False, name='constant_domain'):
super(DomainKernel, self).__init__(input_dim, start, stop, variance, active_dims, ARD, name)
@ -138,8 +155,15 @@ class DomainKernel(LinearSlopeBasisFuncKernel):
return phi#((phi-self.start)/(self.stop-self.start))-.5
class LogisticBasisFuncKernel(BasisFuncKernel):
"""
Create a series of logistic basis functions with centers given. The
slope gets computed by datafit. The number of centers determines the
number of logistic functions.
"""
def __init__(self, input_dim, centers, variance=1., slope=1., active_dims=None, ARD=False, ARD_slope=True, name='logistic'):
self.centers = np.atleast_2d(centers)
if ARD:
assert ARD_slope, "If we have one variance per center, we want also one slope per center."
self.ARD_slope = ARD_slope
if self.ARD_slope:
self.slope = Param('slope', slope * np.ones(self.centers.size))
@ -166,7 +190,7 @@ class LogisticBasisFuncKernel(BasisFuncKernel):
if self.ARD_slope:
self.slope.gradient = self.variance * 2 * np.einsum('ij,iq,jq->q', dL_dK, phi1, dphi1_dl)
else:
self.slope.gradient = self.variance * 2 * (dL_dK * phi1.dot(dphi1_dl.T)).sum()
self.slope.gradient = np.sum(self.variance * 2 * (dL_dK * phi1.dot(dphi1_dl.T)).sum())
else:
phi1 = self.phi(X)
phi2 = self.phi(X2)
@ -178,5 +202,5 @@ class LogisticBasisFuncKernel(BasisFuncKernel):
if self.ARD_slope:
self.slope.gradient = (self.variance * np.einsum('ij,iq,jq->q', dL_dK, phi1, dphi2_dl) + np.einsum('ij,iq,jq->q', dL_dK, phi2, dphi1_dl))
else:
self.slope.gradient = self.variance * (dL_dK * phi1.dot(dphi2_dl.T)).sum() + (dL_dK * phi2.dot(dphi1_dl.T)).sum()
self.slope.gradient = np.sum(self.variance * (dL_dK * phi1.dot(dphi2_dl.T)).sum() + (dL_dK * phi2.dot(dphi1_dl.T)).sum())
self.slope.gradient = np.where(np.isnan(self.slope.gradient), 0, self.slope.gradient)

View file

@ -20,6 +20,7 @@ class KernCallsViaSlicerMeta(ParametersChangedMeta):
def __new__(cls, name, bases, dct):
put_clean(dct, 'K', _slice_K)
put_clean(dct, 'Kdiag', _slice_Kdiag)
put_clean(dct, 'phi', _slice_Kdiag)
put_clean(dct, 'update_gradients_full', _slice_update_gradients_full)
put_clean(dct, 'update_gradients_diag', _slice_update_gradients_diag)
put_clean(dct, 'gradients_X', _slice_gradients_X)

View file

@ -52,8 +52,8 @@ class Stationary(Kern):
The lengthscale(s) and variance parameters are added to the structure automatically.
Thanks to @strongh:
In Stationary, a covariance function is defined in GPy as stationary when it depends only on the l2-norm |x_1 - x_2 |.
However this is the typical definition of isotropy, while stationarity is usually a bit more relaxed.
In Stationary, a covariance function is defined in GPy as stationary when it depends only on the l2-norm |x_1 - x_2 |.
However this is the typical definition of isotropy, while stationarity is usually a bit more relaxed.
The more common version of stationarity is that the covariance is a function of x_1 - x_2 (See e.g. R&W first paragraph of section 4.1).
"""

View file

@ -477,6 +477,34 @@ class KernelGradientTestsContinuous(unittest.TestCase):
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=X, X2=X2, verbose=verbose, fixed_X_dims=[0]))
def test_basis_func_linear_slope(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.LinearSlopeBasisFuncKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_changepoint(self):
points = np.random.uniform(self.X.min(0), self.X.max(0), (self.X.shape[1]))
ks = []
for i in range(points.shape[0]):
ks.append(GPy.kern.ChangePointBasisFuncKernel(1, points[i], ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_domain(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.DomainKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
class KernelTestsMiscellaneous(unittest.TestCase):
def setUp(self):
N, D = 100, 10

View file

@ -91,23 +91,23 @@ class MiscTests(unittest.TestCase):
k = GPy.kern.RBF(1)
m2 = GPy.models.GPRegression(self.X, (Y-mu)/std, kernel=k, normalizer=False)
m2[:] = m[:]
mu1, var1 = m.predict(m.X, full_cov=True)
mu2, var2 = m2.predict(m2.X, full_cov=True)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2*std**2)
mu1, var1 = m.predict(m.X, full_cov=False)
mu2, var2 = m2.predict(m2.X, full_cov=False)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2*std**2)
q50n = m.predict_quantiles(m.X, (50,))
q50 = m2.predict_quantiles(m2.X, (50,))
np.testing.assert_allclose(q50n[0], (q50[0]*std)+mu)
# Test variance component:
qs = np.array([2.5, 97.5])
# The quantiles get computed before unormalization
@ -181,8 +181,8 @@ class MiscTests(unittest.TestCase):
Y_mu_true = 2*X_pred_mu
Y_var_true = 4*X_pred_var
Y_mu_pred, Y_var_pred = m.predict_noiseless(X_pred)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-4)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-4)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-3)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-3)
def test_sparse_raw_predict(self):
k = GPy.kern.RBF(1)
@ -376,18 +376,18 @@ class MiscTests(unittest.TestCase):
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.IdentityFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize()
warp_preds_exact = warp_m_exact.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4)
@ -406,18 +406,18 @@ class MiscTests(unittest.TestCase):
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X, median=True)[0]
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.LogFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize(messages=True)
warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0]
np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4)
np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4)
@ -435,7 +435,7 @@ class MiscTests(unittest.TestCase):
warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
max_iters=max_iters)
warp_m.predict(X)
warp_m.predict_quantiles(X)
@ -444,7 +444,7 @@ class MiscTests(unittest.TestCase):
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
def test_offset_regression(self):
#Tests GPy.models.GPOffsetRegression. Using two small time series
#from a sine wave, we confirm the algorithm determines that the
@ -465,6 +465,53 @@ class MiscTests(unittest.TestCase):
m.optimize()
assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset))
def test_logistic_basis_func_gradients(self):
X = np.random.uniform(-4, 4, (20, 5))
points = np.random.uniform(X.min(0), X.max(0), X.shape[1])
ks = []
for i in range(points.shape[0]):
if (i%2==0) and (i%3!=0):
self.assertRaises(AssertionError, GPy.kern.LogisticBasisFuncKernel, 1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i])
else:
ks.append(GPy.kern.LogisticBasisFuncKernel(1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i]))
k = GPy.kern.Add(ks)
k.randomize()
Y = np.random.normal(0, 1, (X.shape[0], 1))
m = GPy.models.GPRegression(X, Y, kernel=k.copy())
assert m.checkgrad()
def test_posterior_inf_basis_funcs(self):
X = np.random.uniform(-4, 1, (50, 1))
# Logistic:
k = GPy.kern.LogisticBasisFuncKernel(1, [0, -2])
true_w = [1, 2]
true_slope = [5, -2]
Y = 0
for w, s, c in zip(true_w, true_slope, k.centers[0]):
Y += w/(1+np.exp(-s*(X-c)))
Y += np.random.normal(0, .000001)
m = GPy.models.GPRegression(X,Y,kernel=k.copy())
#m.likelihood.fix(1e-6)
m.optimize()
wu, wv = m.kern.posterior_inf()
#_sort = np.argsort(wu.flat)
#from scipy.stats import norm
#confidence_intervals = np.array(norm.interval(.95, loc=wu.flat[_sort], scale=np.sqrt(np.diag(wv))[_sort])).T
#for i in range(wu.size):
# s,t = confidence_intervals[i]
# v = true_w[i]
# assert ((s<v)&(v<t)), "didnt find true w within the 95% confidence interval of the predicted values"
np.testing.assert_allclose(np.sort(wu.flat), np.sort(true_w), rtol=1e-4)
np.testing.assert_allclose(np.diag(wv), 0, atol=1e-4)
np.testing.assert_allclose(np.sort(m.kern.slope.flat), np.sort(true_slope), rtol=1e-4)
class GradientTests(np.testing.TestCase):
def setUp(self):