Merge branch 'bwengals-devel' into devel

This commit is contained in:
Max Zwiessele 2016-09-01 14:11:39 +01:00
commit 493624d831
6 changed files with 120 additions and 20 deletions

View file

@ -15,6 +15,7 @@ class BasisFuncKernel(Kern):
This class does NOT automatically add an offset to the design matrix phi!
"""
super(BasisFuncKernel, self).__init__(input_dim, active_dims, name)
assert self.input_dim==1, "Basis Function Kernel only implemented for one dimension. Use one kernel per dimension (and add them together) for more dimensions"
self.ARD = ARD
if self.ARD:
phi_test = self._phi(np.random.normal(0, 1, (1, self.input_dim)))
@ -60,6 +61,11 @@ class BasisFuncKernel(Kern):
self.variance.gradient = np.einsum('i,i', dL_dKdiag, self.Kdiag(X)) * self.beta
def concatenate_offset(self, X):
"""
Convenience function to add an offset column to phi.
You can use this function to add an offset (bias on y axis)
to phi in your custom self._phi(X).
"""
return np.c_[np.ones((X.shape[0], 1)), X]
def posterior_inf(self, X=None, posterior=None):
@ -120,6 +126,12 @@ class LinearSlopeBasisFuncKernel(BasisFuncKernel):
return ((phi-(self.stop+self.start)/2.))#/(.5*(self.stop-self.start)))-1.
class ChangePointBasisFuncKernel(BasisFuncKernel):
"""
The basis function has a changepoint. That is, it is constant, jumps at a
single point (given as changepoint) and is constant again. You can
give multiple changepoints. The changepoints are calculated using
np.where(self.X < self.changepoint), -1, 1)
"""
def __init__(self, input_dim, changepoint, variance=1., active_dims=None, ARD=False, name='changepoint'):
self.changepoint = np.array(changepoint)
super(ChangePointBasisFuncKernel, self).__init__(input_dim, variance, active_dims, ARD, name)
@ -129,6 +141,11 @@ class ChangePointBasisFuncKernel(BasisFuncKernel):
return np.where((X < self.changepoint), -1, 1)
class DomainKernel(LinearSlopeBasisFuncKernel):
"""
Create a constant plateou of correlation between start and stop and zero
elsewhere. This is a constant shift of the outputs along the yaxis
in the range from start to stop.
"""
def __init__(self, input_dim, start, stop, variance=1., active_dims=None, ARD=False, name='constant_domain'):
super(DomainKernel, self).__init__(input_dim, start, stop, variance, active_dims, ARD, name)
@ -138,8 +155,15 @@ class DomainKernel(LinearSlopeBasisFuncKernel):
return phi#((phi-self.start)/(self.stop-self.start))-.5
class LogisticBasisFuncKernel(BasisFuncKernel):
"""
Create a series of logistic basis functions with centers given. The
slope gets computed by datafit. The number of centers determines the
number of logistic functions.
"""
def __init__(self, input_dim, centers, variance=1., slope=1., active_dims=None, ARD=False, ARD_slope=True, name='logistic'):
self.centers = np.atleast_2d(centers)
if ARD:
assert ARD_slope, "If we have one variance per center, we want also one slope per center."
self.ARD_slope = ARD_slope
if self.ARD_slope:
self.slope = Param('slope', slope * np.ones(self.centers.size))
@ -150,7 +174,6 @@ class LogisticBasisFuncKernel(BasisFuncKernel):
@Cache_this(limit=3, ignore_args=())
def _phi(self, X):
import scipy as sp
phi = 1/(1+np.exp(-((X-self.centers)*self.slope)))
return np.where(np.isnan(phi), 0, phi)#((phi-self.start)/(self.stop-self.start))-.5
@ -167,7 +190,7 @@ class LogisticBasisFuncKernel(BasisFuncKernel):
if self.ARD_slope:
self.slope.gradient = self.variance * 2 * np.einsum('ij,iq,jq->q', dL_dK, phi1, dphi1_dl)
else:
self.slope.gradient = self.variance * 2 * (dL_dK * phi1.dot(dphi1_dl.T)).sum()
self.slope.gradient = np.sum(self.variance * 2 * (dL_dK * phi1.dot(dphi1_dl.T)).sum())
else:
phi1 = self.phi(X)
phi2 = self.phi(X2)
@ -179,5 +202,5 @@ class LogisticBasisFuncKernel(BasisFuncKernel):
if self.ARD_slope:
self.slope.gradient = (self.variance * np.einsum('ij,iq,jq->q', dL_dK, phi1, dphi2_dl) + np.einsum('ij,iq,jq->q', dL_dK, phi2, dphi1_dl))
else:
self.slope.gradient = self.variance * (dL_dK * phi1.dot(dphi2_dl.T)).sum() + (dL_dK * phi2.dot(dphi1_dl.T)).sum()
self.slope.gradient = np.sum(self.variance * (dL_dK * phi1.dot(dphi2_dl.T)).sum() + (dL_dK * phi2.dot(dphi1_dl.T)).sum())
self.slope.gradient = np.where(np.isnan(self.slope.gradient), 0, self.slope.gradient)

View file

@ -20,6 +20,7 @@ class KernCallsViaSlicerMeta(ParametersChangedMeta):
def __new__(cls, name, bases, dct):
put_clean(dct, 'K', _slice_K)
put_clean(dct, 'Kdiag', _slice_Kdiag)
put_clean(dct, 'phi', _slice_Kdiag)
put_clean(dct, 'update_gradients_full', _slice_update_gradients_full)
put_clean(dct, 'update_gradients_diag', _slice_update_gradients_diag)
put_clean(dct, 'gradients_X', _slice_gradients_X)

View file

@ -30,6 +30,7 @@ class GPKroneckerGaussianRegression(Model):
"""
def __init__(self, X1, X2, Y, kern1, kern2, noise_var=1., name='KGPR'):
Model.__init__(self, name=name)
# accept the construction arguments
self.X1 = ObsAr(X1)
self.X2 = ObsAr(X2)

View file

@ -477,6 +477,34 @@ class KernelGradientTestsContinuous(unittest.TestCase):
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=X, X2=X2, verbose=verbose, fixed_X_dims=[0]))
def test_basis_func_linear_slope(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.LinearSlopeBasisFuncKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_changepoint(self):
points = np.random.uniform(self.X.min(0), self.X.max(0), (self.X.shape[1]))
ks = []
for i in range(points.shape[0]):
ks.append(GPy.kern.ChangePointBasisFuncKernel(1, points[i], ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_domain(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.DomainKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
class KernelTestsMiscellaneous(unittest.TestCase):
def setUp(self):
N, D = 100, 10

View file

@ -181,8 +181,8 @@ class MiscTests(unittest.TestCase):
Y_mu_true = 2*X_pred_mu
Y_var_true = 4*X_pred_var
Y_mu_pred, Y_var_pred = m.predict_noiseless(X_pred)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-4)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-4)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-3)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-3)
def test_sparse_raw_predict(self):
k = GPy.kern.RBF(1)
@ -465,6 +465,53 @@ class MiscTests(unittest.TestCase):
m.optimize()
assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset))
def test_logistic_basis_func_gradients(self):
X = np.random.uniform(-4, 4, (20, 5))
points = np.random.uniform(X.min(0), X.max(0), X.shape[1])
ks = []
for i in range(points.shape[0]):
if (i%2==0) and (i%3!=0):
self.assertRaises(AssertionError, GPy.kern.LogisticBasisFuncKernel, 1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i])
else:
ks.append(GPy.kern.LogisticBasisFuncKernel(1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i]))
k = GPy.kern.Add(ks)
k.randomize()
Y = np.random.normal(0, 1, (X.shape[0], 1))
m = GPy.models.GPRegression(X, Y, kernel=k.copy())
assert m.checkgrad()
def test_posterior_inf_basis_funcs(self):
X = np.random.uniform(-4, 1, (50, 1))
# Logistic:
k = GPy.kern.LogisticBasisFuncKernel(1, [0, -2])
true_w = [1, 2]
true_slope = [5, -2]
Y = 0
for w, s, c in zip(true_w, true_slope, k.centers[0]):
Y += w/(1+np.exp(-s*(X-c)))
Y += np.random.normal(0, .000001)
m = GPy.models.GPRegression(X,Y,kernel=k.copy())
#m.likelihood.fix(1e-6)
m.optimize()
wu, wv = m.kern.posterior_inf()
#_sort = np.argsort(wu.flat)
#from scipy.stats import norm
#confidence_intervals = np.array(norm.interval(.95, loc=wu.flat[_sort], scale=np.sqrt(np.diag(wv))[_sort])).T
#for i in range(wu.size):
# s,t = confidence_intervals[i]
# v = true_w[i]
# assert ((s<v)&(v<t)), "didnt find true w within the 95% confidence interval of the predicted values"
np.testing.assert_allclose(np.sort(wu.flat), np.sort(true_w), rtol=1e-4)
np.testing.assert_allclose(np.diag(wv), 0, atol=1e-4)
np.testing.assert_allclose(np.sort(m.kern.slope.flat), np.sort(true_slope), rtol=1e-4)
class GradientTests(np.testing.TestCase):
def setUp(self):