Merge branch 'devel' into binomial_laplace

This commit is contained in:
Max Zwiessele 2016-09-01 14:21:27 +01:00
commit 589c5d9eac
206 changed files with 41346 additions and 4247 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.9 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.5 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.6 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.9 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.8 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.3 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 52 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.8 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.8 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3.8 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 3 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.7 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.5 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.9 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.7 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4 KiB

View file

@ -24,9 +24,9 @@ class Test(unittest.TestCase):
k = GPy.kern.RBF(1)
m = GPy.models.BayesianGPLVM(self.Y, 1, kernel=k)
mu, var = m.predict(m.X)
X = m.X.copy()
X = m.X
Xnew = NormalPosterior(m.X.mean[:10].copy(), m.X.variance[:10].copy())
m.set_XY(Xnew, m.Y[:10])
m.set_XY(Xnew, m.Y[:10].copy())
assert(m.checkgrad())
m.set_XY(X, self.Y)
mu2, var2 = m.predict(m.X)
@ -40,7 +40,7 @@ class Test(unittest.TestCase):
mu, var = m.predict(m.X)
X = m.X.copy()
Xnew = X[:10].copy()
m.set_XY(Xnew, m.Y[:10])
m.set_XY(Xnew, m.Y[:10].copy())
assert(m.checkgrad())
m.set_XY(X, self.Y)
mu2, var2 = m.predict(m.X)

View file

@ -0,0 +1,391 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Alex Grigorevskiy
# Licensed under the BSD 3-clause license (see LICENSE.txt)
"""
Testing state space related functions.
"""
import unittest
import numpy as np
import GPy
import GPy.models.state_space_model as SS_model
from .state_space_main_tests import generate_x_points, generate_sine_data, \
generate_linear_data, generate_brownian_data, generate_linear_plus_sin
from nose import SkipTest
#from state_space_main_tests import generate_x_points, generate_sine_data, \
# generate_linear_data, generate_brownian_data, generate_linear_plus_sin
class StateSpaceKernelsTests(np.testing.TestCase):
def setUp(self):
pass
def run_for_model(self, X, Y, ss_kernel, kalman_filter_type = 'regular',
use_cython=False, check_gradients=True,
optimize=True, optimize_max_iters=250, predict_X=None,
compare_with_GP=True, gp_kernel=None,
mean_compare_decimal=10, var_compare_decimal=7):
m1 = SS_model.StateSpace(X,Y, ss_kernel,
kalman_filter_type=kalman_filter_type,
use_cython=use_cython)
m1.likelihood[:] = Y.var()/100.
if check_gradients:
self.assertTrue(m1.checkgrad())
if 1:#optimize:
m1.optimize(optimizer='lbfgsb', max_iters=1)
if compare_with_GP and (predict_X is None):
predict_X = X
self.assertTrue(compare_with_GP)
if compare_with_GP:
m2 = GPy.models.GPRegression(X,Y, gp_kernel)
m2[:] = m1[:]
if (predict_X is not None):
x_pred_reg_1 = m1.predict(predict_X)
x_quant_reg_1 = m1.predict_quantiles(predict_X)
x_pred_reg_2 = m2.predict(predict_X)
x_quant_reg_2 = m2.predict_quantiles(predict_X)
np.testing.assert_array_almost_equal(x_pred_reg_1[0], x_pred_reg_2[0], mean_compare_decimal)
np.testing.assert_array_almost_equal(x_pred_reg_1[1], x_pred_reg_2[1], var_compare_decimal)
np.testing.assert_array_almost_equal(x_quant_reg_1[0], x_quant_reg_2[0], mean_compare_decimal)
np.testing.assert_array_almost_equal(x_quant_reg_1[1], x_quant_reg_2[1], mean_compare_decimal)
np.testing.assert_array_almost_equal(m1.gradient, m2.gradient, var_compare_decimal)
np.testing.assert_almost_equal(m1.log_likelihood(), m2.log_likelihood(), var_compare_decimal)
def test_Matern32_kernel(self,):
np.random.seed(234) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Matern32(1,active_dims=[0,])
gp_kernel = GPy.kern.Matern32(1,active_dims=[0,])
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
compare_with_GP=True,
gp_kernel=gp_kernel,
mean_compare_decimal=5, var_compare_decimal=5)
def test_Matern52_kernel(self,):
np.random.seed(234) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Matern52(1,active_dims=[0,])
gp_kernel = GPy.kern.Matern52(1,active_dims=[0,])
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
optimize = True, predict_X=X,
compare_with_GP=True, gp_kernel=gp_kernel,
mean_compare_decimal=5, var_compare_decimal=5)
def test_RBF_kernel(self,):
np.random.seed(234) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_RBF(1, 110., 1.5, active_dims=[0,])
gp_kernel = GPy.kern.RBF(1, 110., 1.5, active_dims=[0,])
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
optimize_max_iters=1000,
mean_compare_decimal=2, var_compare_decimal=1)
def test_periodic_kernel(self,):
np.random.seed(322) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
ss_kernel.lengthscale.constrain_bounded(0.27, 1000)
ss_kernel.period.constrain_bounded(0.17, 100)
gp_kernel = GPy.kern.StdPeriodic(1,active_dims=[0,])
gp_kernel.lengthscale.constrain_bounded(0.27, 1000)
gp_kernel.period.constrain_bounded(0.17, 100)
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=3, var_compare_decimal=3)
def test_quasi_periodic_kernel(self,):
np.random.seed(329) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Matern32(1)*GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
ss_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
ss_kernel.std_periodic.period.constrain_bounded(0.15, 100)
gp_kernel = GPy.kern.Matern32(1)*GPy.kern.StdPeriodic(1,active_dims=[0,])
gp_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
gp_kernel.std_periodic.period.constrain_bounded(0.15, 100)
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=1, var_compare_decimal=2)
def test_linear_kernel(self,):
np.random.seed(234) # seed the random number generator
(X,Y) = generate_linear_data(x_points=None, tangent=2.0, add_term=20.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Linear(1,X,active_dims=[0,]) + GPy.kern.sde_Bias(1, active_dims=[0,])
gp_kernel = GPy.kern.Linear(1, active_dims=[0,]) + GPy.kern.Bias(1, active_dims=[0,])
self.run_for_model(X, Y, ss_kernel, check_gradients= False,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=5, var_compare_decimal=5)
def test_brownian_kernel(self,):
np.random.seed(234) # seed the random number generator
(X,Y) = generate_brownian_data(x_points=None, kernel_var=2.0, noise_var = 0.1,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Brownian()
gp_kernel = GPy.kern.Brownian()
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=4, var_compare_decimal=4)
def test_exponential_kernel(self,):
np.random.seed(12345) # seed the random number generator
(X,Y) = generate_linear_data(x_points=None, tangent=1.0, add_term=20.0, noise_var=2.0,
plot = False, points_num=10, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
ss_kernel = GPy.kern.sde_Exponential(1, Y.var(), X.ptp()/2., active_dims=[0,])
gp_kernel = GPy.kern.Exponential(1, Y.var(), X.ptp()/2., active_dims=[0,])
Y -= Y.mean()
self.run_for_model(X, Y, ss_kernel, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
optimize_max_iters=1000,
mean_compare_decimal=2, var_compare_decimal=2)
def test_kernel_addition_svd(self,):
#np.random.seed(329) # seed the random number generator
np.random.seed(42)
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=5.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
(X1,Y1) = generate_linear_data(x_points=X, tangent=1.0, add_term=20.0, noise_var=0.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
# Sine data <-
Y = Y + Y1
Y -= Y.mean()
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
def get_new_kernels():
ss_kernel = GPy.kern.sde_Linear(1, X, variances=1) + GPy.kern.sde_StdPeriodic(1, period=5.0, variance=300, lengthscale=3, active_dims=[0,])
#ss_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
#ss_kernel.std_periodic.period.constrain_bounded(3, 8)
gp_kernel = GPy.kern.Linear(1, variances=1) + GPy.kern.StdPeriodic(1, period=5.0, variance=300, lengthscale=3, active_dims=[0,])
#gp_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
#gp_kernel.std_periodic.period.constrain_bounded(3, 8)
return ss_kernel, gp_kernel
# Cython is available only with svd.
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'svd',
use_cython=True, optimize_max_iters=10, check_gradients=False,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=3, var_compare_decimal=3)
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'svd',
use_cython=False, optimize_max_iters=10, check_gradients=False,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=3, var_compare_decimal=3)
def test_kernel_addition_regular(self,):
#np.random.seed(329) # seed the random number generator
np.random.seed(42)
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=5.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
(X1,Y1) = generate_linear_data(x_points=X, tangent=1.0, add_term=20.0, noise_var=0.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
# Sine data <-
Y = Y + Y1
Y -= Y.mean()
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
def get_new_kernels():
ss_kernel = GPy.kern.sde_Linear(1, X, variances=1) + GPy.kern.sde_StdPeriodic(1, period=5.0, variance=300, lengthscale=3, active_dims=[0,])
#ss_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
#ss_kernel.std_periodic.period.constrain_bounded(3, 8)
gp_kernel = GPy.kern.Linear(1, variances=1) + GPy.kern.StdPeriodic(1, period=5.0, variance=300, lengthscale=3, active_dims=[0,])
#gp_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
#gp_kernel.std_periodic.period.constrain_bounded(3, 8)
return ss_kernel, gp_kernel
ss_kernel, gp_kernel = get_new_kernels()
try:
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'regular',
use_cython=False, optimize_max_iters=10, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
except AssertionError:
raise SkipTest("Skipping Regular kalman filter for kernel addition, as it seems to be bugged for some python versions")
def test_kernel_multiplication(self,):
np.random.seed(329) # seed the random number generator
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=50, x_interval = (0, 20), random=True)
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
def get_new_kernels():
ss_kernel = GPy.kern.sde_Matern32(1)*GPy.kern.sde_Matern52(1)
gp_kernel = GPy.kern.Matern32(1)*GPy.kern.sde_Matern52(1)
return ss_kernel, gp_kernel
ss_kernel, gp_kernel = get_new_kernels()
#import ipdb;ipdb.set_trace()
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'svd',
use_cython=True, optimize_max_iters=10, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'regular',
use_cython=False, optimize_max_iters=10, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X, Y, ss_kernel, kalman_filter_type = 'svd',
use_cython=False, optimize_max_iters=10, check_gradients=True,
predict_X=X,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
def test_forecast(self,):
"""
Test time-series forecasting.
"""
# Generate data ->
np.random.seed(339) # seed the random number generator
#import pdb; pdb.set_trace()
(X,Y) = generate_sine_data(x_points=None, sin_period=5.0, sin_ampl=5.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
(X1,Y1) = generate_linear_data(x_points=X, tangent=1.0, add_term=20.0, noise_var=0.0,
plot = False, points_num=100, x_interval = (0, 40), random=True)
Y = Y + Y1
X_train = X[X <= 20]
Y_train = Y[X <= 20]
X_test = X[X > 20]
Y_test = Y[X > 20]
X.shape = (X.shape[0],1); Y.shape = (Y.shape[0],1)
X_train.shape = (X_train.shape[0],1); Y_train.shape = (Y_train.shape[0],1)
X_test.shape = (X_test.shape[0],1); Y_test.shape = (Y_test.shape[0],1)
# Generate data <-
#import pdb; pdb.set_trace()
def get_new_kernels():
periodic_kernel = GPy.kern.StdPeriodic(1,active_dims=[0,])
gp_kernel = GPy.kern.Linear(1, active_dims=[0,]) + GPy.kern.Bias(1, active_dims=[0,]) + periodic_kernel
gp_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
gp_kernel.std_periodic.period.constrain_bounded(0.15, 100)
periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
ss_kernel = GPy.kern.sde_Linear(1,X,active_dims=[0,]) + \
GPy.kern.sde_Bias(1, active_dims=[0,]) + periodic_kernel
ss_kernel.std_periodic.lengthscale.constrain_bounded(0.25, 1000)
ss_kernel.std_periodic.period.constrain_bounded(0.15, 100)
return ss_kernel, gp_kernel
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X_train, Y_train, ss_kernel, kalman_filter_type = 'regular',
use_cython=False, optimize_max_iters=30, check_gradients=True,
predict_X=X_test,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X_train, Y_train, ss_kernel, kalman_filter_type = 'svd',
use_cython=False, optimize_max_iters=30, check_gradients=False,
predict_X=X_test,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
ss_kernel, gp_kernel = get_new_kernels()
self.run_for_model(X_train, Y_train, ss_kernel, kalman_filter_type = 'svd',
use_cython=True, optimize_max_iters=30, check_gradients=False,
predict_X=X_test,
gp_kernel=gp_kernel,
mean_compare_decimal=2, var_compare_decimal=2)
if __name__ == "__main__":
print("Running state-space inference tests...")
unittest.main()
#tt = StateSpaceKernelsTests('test_periodic_kernel')
#import pdb; pdb.set_trace()
#tt.test_Matern32_kernel()
#tt.test_Matern52_kernel()
#tt.test_RBF_kernel()
#tt.test_periodic_kernel()
#tt.test_quasi_periodic_kernel()
#tt.test_linear_kernel()
#tt.test_brownian_kernel()
#tt.test_exponential_kernel()
#tt.test_kernel_addition()
#tt.test_kernel_multiplication()
#tt.test_forecast()

View file

@ -2,11 +2,15 @@
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import unittest
import numpy as np
from unittest.case import skip
import GPy
from GPy.core.parameterization.param import Param
import numpy as np
from ..util.config import config
verbose = 0
try:
@ -100,7 +104,45 @@ class Kern_check_dKdiag_dX(Kern_check_dK_dX):
def parameters_changed(self):
self.X.gradient[:] = self.kernel.gradients_X_diag(self.dL_dK.diagonal(), self.X)
class Kern_check_d2K_dXdX(Kern_check_model):
"""This class allows gradient checks for the secondderivative of a kernel with respect to X. """
def __init__(self, kernel=None, dL_dK=None, X=None, X2=None):
Kern_check_model.__init__(self,kernel=kernel,dL_dK=dL_dK, X=X, X2=X2)
self.X = Param('X',X.copy())
self.link_parameter(self.X)
self.Xc = X.copy()
def log_likelihood(self):
if self.X2 is None:
return self.kernel.gradients_X(self.dL_dK, self.X, self.Xc).sum()
return self.kernel.gradients_X(self.dL_dK, self.X, self.X2).sum()
def parameters_changed(self):
#if self.kernel.name == 'rbf':
# import ipdb;ipdb.set_trace()
if self.X2 is None:
grads = -self.kernel.gradients_XX(self.dL_dK, self.X).sum(1).sum(1)
else:
grads = -self.kernel.gradients_XX(self.dL_dK.T, self.X2, self.X).sum(0).sum(1)
self.X.gradient[:] = grads
class Kern_check_d2Kdiag_dXdX(Kern_check_model):
"""This class allows gradient checks for the second derivative of a kernel with respect to X. """
def __init__(self, kernel=None, dL_dK=None, X=None):
Kern_check_model.__init__(self,kernel=kernel,dL_dK=dL_dK, X=X)
self.X = Param('X',X)
self.link_parameter(self.X)
self.Xc = X.copy()
def log_likelihood(self):
l = 0.
for i in range(self.X.shape[0]):
l += self.kernel.gradients_X(self.dL_dK[[i],[i]], self.X[[i]], self.Xc[[i]]).sum()
return l
def parameters_changed(self):
grads = -self.kernel.gradients_XX_diag(self.dL_dK.diagonal(), self.X)
self.X.gradient[:] = grads.sum(-1)
def check_kernel_gradient_functions(kern, X=None, X2=None, output_ind=None, verbose=False, fixed_X_dims=None):
"""
@ -151,7 +193,12 @@ def check_kernel_gradient_functions(kern, X=None, X2=None, output_ind=None, verb
if verbose:
print("Checking gradients of K(X, X2) wrt theta.")
result = Kern_check_dK_dtheta(kern, X=X, X2=X2).checkgrad(verbose=verbose)
try:
result = Kern_check_dK_dtheta(kern, X=X, X2=X2).checkgrad(verbose=verbose)
except NotImplementedError:
result=True
if verbose:
print(("update_gradients_full, with differing X and X2, not implemented for " + kern.name))
if result and verbose:
print("Check passed.")
if not result:
@ -238,6 +285,66 @@ def check_kernel_gradient_functions(kern, X=None, X2=None, output_ind=None, verb
assert(result)
return False
if verbose:
print("Checking gradients of dK(X, X2) wrt X2 with full cov in dimensions")
try:
testmodel = Kern_check_d2K_dXdX(kern, X=X, X2=X2)
if fixed_X_dims is not None:
testmodel.X[:,fixed_X_dims].fix()
result = testmodel.checkgrad(verbose=verbose)
except NotImplementedError:
result=True
if verbose:
print(("gradients_X not implemented for " + kern.name))
if result and verbose:
print("Check passed.")
if not result:
print(("Gradient of dK(X, X2) wrt X failed for " + kern.name + " covariance function. Gradient values as follows:"))
testmodel.checkgrad(verbose=True)
assert(result)
pass_checks = False
return False
if verbose:
print("Checking gradients of dK(X, X) wrt X with full cov in dimensions")
try:
testmodel = Kern_check_d2K_dXdX(kern, X=X, X2=None)
if fixed_X_dims is not None:
testmodel.X[:,fixed_X_dims].fix()
result = testmodel.checkgrad(verbose=verbose)
except NotImplementedError:
result=True
if verbose:
print(("gradients_X not implemented for " + kern.name))
if result and verbose:
print("Check passed.")
if not result:
print(("Gradient of dK(X, X) wrt X with full cov in dimensions failed for " + kern.name + " covariance function. Gradient values as follows:"))
testmodel.checkgrad(verbose=True)
assert(result)
pass_checks = False
return False
if verbose:
print("Checking gradients of dKdiag(X, X) wrt X with cov in dimensions")
try:
testmodel = Kern_check_d2Kdiag_dXdX(kern, X=X)
if fixed_X_dims is not None:
testmodel.X[:,fixed_X_dims].fix()
result = testmodel.checkgrad(verbose=verbose)
except NotImplementedError:
result=True
if verbose:
print(("gradients_X not implemented for " + kern.name))
if result and verbose:
print("Check passed.")
if not result:
print(("Gradient of dKdiag(X, X) wrt X with cov in dimensions failed for " + kern.name + " covariance function. Gradient values as follows:"))
testmodel.checkgrad(verbose=True)
assert(result)
pass_checks = False
return False
return pass_checks
@ -245,8 +352,8 @@ def check_kernel_gradient_functions(kern, X=None, X2=None, output_ind=None, verb
class KernelGradientTestsContinuous(unittest.TestCase):
def setUp(self):
self.N, self.D = 10, 5
self.X = np.random.randn(self.N,self.D)
self.X2 = np.random.randn(self.N+10,self.D)
self.X = np.random.randn(self.N,self.D+1)
self.X2 = np.random.randn(self.N+10,self.D+1)
continuous_kerns = ['RBF', 'Linear']
self.kernclasses = [getattr(GPy.kern, s) for s in continuous_kerns]
@ -295,7 +402,7 @@ class KernelGradientTestsContinuous(unittest.TestCase):
def test_Add_dims(self):
k = GPy.kern.Matern32(2, active_dims=[2,self.D]) + GPy.kern.RBF(2, active_dims=[0,4]) + GPy.kern.Linear(self.D)
k.randomize()
self.assertRaises(IndexError, k.K, self.X)
self.assertRaises(IndexError, k.K, self.X[:, :self.D])
k = GPy.kern.Matern32(2, active_dims=[2,self.D-1]) + GPy.kern.RBF(2, active_dims=[0,4]) + GPy.kern.Linear(self.D)
k.randomize()
# assert it runs:
@ -310,7 +417,22 @@ class KernelGradientTestsContinuous(unittest.TestCase):
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_RBF(self):
k = GPy.kern.RBF(self.D, ARD=True)
k = GPy.kern.RBF(self.D-1, ARD=True)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_integral(self):
k = GPy.kern.Integral(1)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_multidimensional_integral_limits(self):
k = GPy.kern.Multidimensional_Integral_Limits(2)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_integral_limits(self):
k = GPy.kern.Integral_Limits(2)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
@ -324,6 +446,13 @@ class KernelGradientTestsContinuous(unittest.TestCase):
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_Fixed(self):
cov = np.dot(self.X, self.X.T)
X = np.arange(self.N).reshape(self.N, 1)
k = GPy.kern.Fixed(1, cov)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=X, X2=None, verbose=verbose))
def test_Poly(self):
k = GPy.kern.Poly(self.D, order=5)
k.randomize()
@ -339,6 +468,43 @@ class KernelGradientTestsContinuous(unittest.TestCase):
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_Precomputed(self):
Xall = np.concatenate([self.X, self.X2])
cov = np.dot(Xall, Xall.T)
X = np.arange(self.N).reshape(self.N, 1)
X2 = np.arange(self.N,2*self.N+10).reshape(self.N+10, 1)
k = GPy.kern.Precomputed(1, cov)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=X, X2=X2, verbose=verbose, fixed_X_dims=[0]))
def test_basis_func_linear_slope(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.LinearSlopeBasisFuncKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_changepoint(self):
points = np.random.uniform(self.X.min(0), self.X.max(0), (self.X.shape[1]))
ks = []
for i in range(points.shape[0]):
ks.append(GPy.kern.ChangePointBasisFuncKernel(1, points[i], ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_basis_func_domain(self):
start_stop = np.random.uniform(self.X.min(0), self.X.max(0), (4, self.X.shape[1])).T
start_stop.sort(axis=1)
ks = []
for i in range(start_stop.shape[0]):
start, stop = np.split(start_stop[i], 2)
ks.append(GPy.kern.DomainKernel(1, start, stop, ARD=i%2==0, active_dims=[i]))
k = GPy.kern.Add(ks)
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
class KernelTestsMiscellaneous(unittest.TestCase):
def setUp(self):
N, D = 100, 10
@ -394,8 +560,13 @@ class KernelTestsNonContinuous(unittest.TestCase):
self.X2[:(N0*2), -1] = 0
self.X2[(N0*2):, -1] = 1
@unittest.expectedFailure
def test_IndependentOutputs(self):
k = [GPy.kern.RBF(1, active_dims=[1], name='rbf1'), GPy.kern.RBF(self.D, active_dims=range(self.D), name='rbf012'), GPy.kern.RBF(2, active_dims=[0,2], name='rbf02')]
kern = GPy.kern.IndependentOutputs(k, -1, name='ind_split')
np.testing.assert_array_equal(kern.active_dims, [-1,0,1,2])
np.testing.assert_array_equal(kern._all_dims_active, [0,1,2,-1])
def testIndependendGradients(self):
k = GPy.kern.RBF(self.D, active_dims=range(self.D))
kern = GPy.kern.IndependentOutputs(k, -1, 'ind_single')
self.assertTrue(check_kernel_gradient_functions(kern, X=self.X, X2=self.X2, verbose=verbose, fixed_X_dims=-1))
@ -403,8 +574,13 @@ class KernelTestsNonContinuous(unittest.TestCase):
kern = GPy.kern.IndependentOutputs(k, -1, name='ind_split')
self.assertTrue(check_kernel_gradient_functions(kern, X=self.X, X2=self.X2, verbose=verbose, fixed_X_dims=-1))
@unittest.expectedFailure
def test_Hierarchical(self):
k = [GPy.kern.RBF(2, active_dims=[0,2], name='rbf1'), GPy.kern.RBF(2, active_dims=[0,2], name='rbf2')]
kern = GPy.kern.IndependentOutputs(k, -1, name='ind_split')
np.testing.assert_array_equal(kern.active_dims, [-1,0,2])
np.testing.assert_array_equal(kern._all_dims_active, [0,1,2,-1])
def test_Hierarchical_gradients(self):
k = [GPy.kern.RBF(2, active_dims=[0,2], name='rbf1'), GPy.kern.RBF(2, active_dims=[0,2], name='rbf2')]
kern = GPy.kern.IndependentOutputs(k, -1, name='ind_split')
self.assertTrue(check_kernel_gradient_functions(kern, X=self.X, X2=self.X2, verbose=verbose, fixed_X_dims=-1))
@ -416,6 +592,10 @@ class KernelTestsNonContinuous(unittest.TestCase):
X2 = self.X2[self.X2[:,-1]!=2]
self.assertTrue(check_kernel_gradient_functions(kern, X=X, X2=X2, verbose=verbose, fixed_X_dims=-1))
def test_Coregionalize(self):
kern = GPy.kern.Coregionalize(1, output_dim=3, active_dims=[-1])
self.assertTrue(check_kernel_gradient_functions(kern, X=self.X, X2=self.X2, verbose=verbose, fixed_X_dims=-1))
@unittest.skipIf(not config.getboolean('cython', 'working'),"Cython modules have not been built on this machine")
class Coregionalize_cython_test(unittest.TestCase):
"""

View file

@ -28,10 +28,49 @@ class MFtests(unittest.TestCase):
A linear mean function with parameters that we'll learn alongside the kernel
"""
X = np.linspace(-1,10,50).reshape(-1,1)
Y = 3-np.abs((X-6))
Y += .5*np.cos(3*X) + 0.3*np.random.randn(*X.shape)
mf = GPy.mappings.PiecewiseLinear(1, 1, [-1,1], [9,2])
k =GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()
m = GPy.core.GP(X, Y, kernel=k, likelihood=lik, mean_function=mf)
self.assertTrue(m.checkgrad())
def test_parametric_mean_function_composition(self):
"""
A linear mean function with parameters that we'll learn alongside the kernel
"""
X = np.linspace(0,10,50).reshape(-1,1)
Y = np.sin(X) + 0.5*np.cos(3*X) + 0.1*np.random.randn(*X.shape) + 3*X
mf = GPy.mappings.Linear(1,1)
mf = GPy.mappings.Compound(GPy.mappings.Linear(1,1),
GPy.mappings.Kernel(1, 1, np.random.normal(0,1,(1,1)),
GPy.kern.RBF(1))
)
k =GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()
m = GPy.core.GP(X, Y, kernel=k, likelihood=lik, mean_function=mf)
self.assertTrue(m.checkgrad())
def test_parametric_mean_function_additive(self):
"""
A linear mean function with parameters that we'll learn alongside the kernel
"""
X = np.linspace(0,10,50).reshape(-1,1)
Y = np.sin(X) + 0.5*np.cos(3*X) + 0.1*np.random.randn(*X.shape) + 3*X
mf = GPy.mappings.Additive(GPy.mappings.Constant(1,1,3),
GPy.mappings.Additive(GPy.mappings.MLP(1,1),
GPy.mappings.Identity(1,1)
)
)
k =GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()

View file

@ -127,28 +127,32 @@ class SparseGPMinibatchTest(unittest.TestCase):
def test_sparsegp_init(self):
# Test if the different implementations give the exact same likelihood as the full model.
# All of the following settings should give the same likelihood and gradients as the full model:
np.random.seed(1234)
Z = self.X[np.random.choice(self.X.shape[0], replace=False, size=10)].copy()
Q = Z.shape[1]
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=True, stochastic=False)
assert(m.checkgrad())
m.optimize('adadelta', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=True, stochastic=True)
assert(m.checkgrad())
m.optimize('rprop', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=False, stochastic=False)
assert(m.checkgrad())
m.optimize('rprop', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=False, stochastic=True)
assert(m.checkgrad())
m.optimize('adadelta', max_iters=10)
assert(m.checkgrad())
try:
np.random.seed(1234)
Z = self.X[np.random.choice(self.X.shape[0], replace=False, size=10)].copy()
Q = Z.shape[1]
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=True, stochastic=False)
assert(m.checkgrad())
m.optimize('adadelta', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=True, stochastic=True)
assert(m.checkgrad())
m.optimize('rprop', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=False, stochastic=False)
assert(m.checkgrad())
m.optimize('rprop', max_iters=10)
assert(m.checkgrad())
m = GPy.models.sparse_gp_minibatch.SparseGPMiniBatch(self.X, self.Y, Z, GPy.kern.RBF(Q)+GPy.kern.Matern32(Q)+GPy.kern.Bias(Q), GPy.likelihoods.Gaussian(), missing_data=False, stochastic=True)
assert(m.checkgrad())
m.optimize('adadelta', max_iters=10)
assert(m.checkgrad())
except ImportError:
from nose import SkipTest
raise SkipTest('climin not installed, skipping stochastic gradients')
def test_predict_missing_data(self):
m = GPy.models.bayesian_gplvm_minibatch.BayesianGPLVMMiniBatch(self.Y, self.Q, X_variance=False, missing_data=True, stochastic=True, batchsize=self.Y.shape[1])

View file

@ -27,7 +27,7 @@ class MiscTests(unittest.TestCase):
Test whether the predicted variance of normal GP goes negative under numerical unstable situation.
Thanks simbartonels@github for reporting the bug and providing the following example.
"""
# set seed for reproducability
np.random.seed(3)
# Definition of the Branin test function
@ -69,13 +69,13 @@ class MiscTests(unittest.TestCase):
K_hat = k.K(self.X_new) - k.K(self.X_new, self.X).dot(Kinv).dot(k.K(self.X, self.X_new))
mu_hat = k.K(self.X_new, self.X).dot(Kinv).dot(m.Y_normalized)
mu, covar = m._raw_predict(self.X_new, full_cov=True)
mu, covar = m.predict_noiseless(self.X_new, full_cov=True)
self.assertEquals(mu.shape, (self.N_new, self.D))
self.assertEquals(covar.shape, (self.N_new, self.N_new))
np.testing.assert_almost_equal(K_hat, covar)
np.testing.assert_almost_equal(mu_hat, mu)
mu, var = m._raw_predict(self.X_new)
mu, var = m.predict_noiseless(self.X_new)
self.assertEquals(mu.shape, (self.N_new, self.D))
self.assertEquals(var.shape, (self.N_new, 1))
np.testing.assert_almost_equal(np.diag(K_hat)[:, None], var)
@ -91,19 +91,33 @@ class MiscTests(unittest.TestCase):
k = GPy.kern.RBF(1)
m2 = GPy.models.GPRegression(self.X, (Y-mu)/std, kernel=k, normalizer=False)
m2[:] = m[:]
mu1, var1 = m.predict(m.X, full_cov=True)
mu2, var2 = m2.predict(m2.X, full_cov=True)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2)
np.testing.assert_allclose(var1, var2*std**2)
mu1, var1 = m.predict(m.X, full_cov=False)
mu2, var2 = m2.predict(m2.X, full_cov=False)
np.testing.assert_allclose(mu1, (mu2*std)+mu)
np.testing.assert_allclose(var1, var2)
np.testing.assert_allclose(var1, var2*std**2)
q50n = m.predict_quantiles(m.X, (50,))
q50 = m2.predict_quantiles(m2.X, (50,))
np.testing.assert_allclose(q50n[0], (q50[0]*std)+mu)
# Test variance component:
qs = np.array([2.5, 97.5])
# The quantiles get computed before unormalization
# And transformed using the mean transformation:
c = np.random.choice(self.X.shape[0])
q95 = m2.predict_quantiles(self.X[[c]], qs)
mu, var = m2.predict(self.X[[c]])
from scipy.stats import norm
np.testing.assert_allclose((mu+(norm.ppf(qs/100.)*np.sqrt(var))).flatten(), np.array(q95).flatten())
def check_jacobian(self):
try:
import autograd.numpy as np, autograd as ag, GPy, matplotlib.pyplot as plt
@ -166,9 +180,9 @@ class MiscTests(unittest.TestCase):
# S = \int (f(x) - m)^2q(x|mu,S) dx = \int f(x)^2 q(x) dx - mu**2 = 4(mu^2 + S) - (2.mu)^2 = 4S
Y_mu_true = 2*X_pred_mu
Y_var_true = 4*X_pred_var
Y_mu_pred, Y_var_pred = m._raw_predict(X_pred)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-4)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-4)
Y_mu_pred, Y_var_pred = m.predict_noiseless(X_pred)
np.testing.assert_allclose(Y_mu_true, Y_mu_pred, rtol=1e-3)
np.testing.assert_allclose(Y_var_true, Y_var_pred, rtol=1e-3)
def test_sparse_raw_predict(self):
k = GPy.kern.RBF(1)
@ -181,13 +195,13 @@ class MiscTests(unittest.TestCase):
K_hat = k.K(self.X_new) - k.K(self.X_new, Z).dot(Kinv).dot(k.K(Z, self.X_new))
K_hat = np.clip(K_hat, 1e-15, np.inf)
mu, covar = m._raw_predict(self.X_new, full_cov=True)
mu, covar = m.predict_noiseless(self.X_new, full_cov=True)
self.assertEquals(mu.shape, (self.N_new, self.D))
self.assertEquals(covar.shape, (self.N_new, self.N_new))
np.testing.assert_almost_equal(K_hat, covar)
# np.testing.assert_almost_equal(mu_hat, mu)
mu, var = m._raw_predict(self.X_new)
mu, var = m.predict_noiseless(self.X_new)
self.assertEquals(mu.shape, (self.N_new, self.D))
self.assertEquals(var.shape, (self.N_new, 1))
np.testing.assert_almost_equal(np.diag(K_hat)[:, None], var)
@ -361,52 +375,143 @@ class MiscTests(unittest.TestCase):
preds = m.predict(self.X)
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.IdentityFunction()
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k, warping_function=warp_f)
warp_f = GPy.util.warping_functions.IdentityFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds)
@unittest.skip('Comment this to plot the modified sine function')
def test_warped_gp_sine(self):
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.IdentityFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, self.Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize()
warp_preds_exact = warp_m_exact.predict(self.X)
np.testing.assert_almost_equal(preds, warp_preds, decimal=4)
np.testing.assert_almost_equal(preds, warp_preds_exact, decimal=4)
def test_warped_gp_log(self):
"""
A test replicating the sine regression problem from
Snelson's paper.
A WarpedGP with the log warping function should be
equal to a standard GP with log labels.
Note that we predict the median here.
"""
k = GPy.kern.RBF(1)
Y = np.abs(self.Y)
logY = np.log(Y)
m = GPy.models.GPRegression(self.X, logY, kernel=k)
m.optimize()
preds = m.predict(self.X)[0]
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.LogFunction(closed_inverse=False)
warp_m = GPy.models.WarpedGP(self.X, Y, kernel=warp_k,
warping_function=warp_f)
warp_m.optimize()
warp_preds = warp_m.predict(self.X, median=True)[0]
warp_k_exact = GPy.kern.RBF(1)
warp_f_exact = GPy.util.warping_functions.LogFunction()
warp_m_exact = GPy.models.WarpedGP(self.X, Y, kernel=warp_k_exact,
warping_function=warp_f_exact)
warp_m_exact.optimize(messages=True)
warp_preds_exact = warp_m_exact.predict(self.X, median=True)[0]
np.testing.assert_almost_equal(np.exp(preds), warp_preds, decimal=4)
np.testing.assert_almost_equal(np.exp(preds), warp_preds_exact, decimal=4)
def test_warped_gp_cubic_sine(self, max_iters=100):
"""
A test replicating the cubic sine regression problem from
Snelson's paper. This test doesn't have any assertions, it's
just to ensure coverage of the tanh warping function code.
"""
X = (2 * np.pi) * np.random.random(151) - np.pi
Y = np.sin(X) + np.random.normal(0,0.1,151)
Y = np.exp(Y) - 5
#Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) + 0
#np.seterr(over='raise')
import matplotlib.pyplot as plt
warp_k = GPy.kern.RBF(1)
warp_f = GPy.util.warping_functions.TanhWarpingFunction_d(n_terms=2)
warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f)
#warp_m['.*variance.*'].constrain_fixed(0.25)
#warp_m['.*lengthscale.*'].constrain_fixed(1)
#warp_m['warp_tanh.d'].constrain_fixed(1)
#warp_m.randomize()
#warp_m['.*warp_tanh.psi*'][:,0:2].constrain_bounded(0,100)
#warp_m['.*warp_tanh.psi*'][:,0:1].constrain_fixed(1)
#print(warp_m.checkgrad())
#warp_m.plot()
#plt.show()
Y = np.sin(X) + np.random.normal(0,0.2,151)
Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y])
X = X[:, None]
Y = Y[:, None]
warp_m.optimize_restarts(parallel=True, robust=True)
#print(warp_m.checkgrad())
print(warp_m)
print(warp_m['.*warp.*'])
warp_m = GPy.models.WarpedGP(X, Y)#, kernel=warp_k)#, warping_function=warp_f)
warp_m['.*\.d'].constrain_fixed(1.0)
warp_m.optimize_restarts(parallel=False, robust=False, num_restarts=5,
max_iters=max_iters)
warp_m.predict(X)
warp_m.predict_quantiles(X)
warp_m.log_predictive_density(X, Y)
warp_m.predict_in_warped_space = False
warp_m.plot()
warp_m.predict_in_warped_space = True
warp_m.plot()
warp_f.plot(X.min()-10, X.max()+10)
plt.show()
def test_offset_regression(self):
#Tests GPy.models.GPOffsetRegression. Using two small time series
#from a sine wave, we confirm the algorithm determines that the
#likelihood is maximised when the offset hyperparameter is approximately
#equal to the actual offset in X between the two time series.
offset = 3
X1 = np.arange(0,50,5.0)[:,None]
X2 = np.arange(0+offset,50+offset,5.0)[:,None]
X = np.vstack([X1,X2])
ind = np.vstack([np.zeros([10,1]),np.ones([10,1])])
X = np.hstack([X,ind])
Y = np.sin((X[0:10,0])/30.0)[:,None]
Y = np.vstack([Y,Y])
m = GPy.models.GPOffsetRegression(X,Y)
m.rbf.lengthscale=5.0 #make it something other than one to check our gradients properly!
assert m.checkgrad(), "Gradients of offset parameters don't match numerical approximations."
m.optimize()
assert np.abs(m.offset[0]-offset)<0.1, ("GPOffsetRegression model failing to estimate correct offset (value estimated = %0.2f instead of %0.2f)" % (m.offset[0], offset))
def test_logistic_basis_func_gradients(self):
X = np.random.uniform(-4, 4, (20, 5))
points = np.random.uniform(X.min(0), X.max(0), X.shape[1])
ks = []
for i in range(points.shape[0]):
if (i%2==0) and (i%3!=0):
self.assertRaises(AssertionError, GPy.kern.LogisticBasisFuncKernel, 1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i])
else:
ks.append(GPy.kern.LogisticBasisFuncKernel(1, points, ARD=i%2==0, ARD_slope=i%3==0, active_dims=[i]))
k = GPy.kern.Add(ks)
k.randomize()
Y = np.random.normal(0, 1, (X.shape[0], 1))
m = GPy.models.GPRegression(X, Y, kernel=k.copy())
assert m.checkgrad()
def test_posterior_inf_basis_funcs(self):
X = np.random.uniform(-4, 1, (50, 1))
# Logistic:
k = GPy.kern.LogisticBasisFuncKernel(1, [0, -2])
true_w = [1, 2]
true_slope = [5, -2]
Y = 0
for w, s, c in zip(true_w, true_slope, k.centers[0]):
Y += w/(1+np.exp(-s*(X-c)))
Y += np.random.normal(0, .000001)
m = GPy.models.GPRegression(X,Y,kernel=k.copy())
#m.likelihood.fix(1e-6)
m.optimize()
wu, wv = m.kern.posterior_inf()
#_sort = np.argsort(wu.flat)
#from scipy.stats import norm
#confidence_intervals = np.array(norm.interval(.95, loc=wu.flat[_sort], scale=np.sqrt(np.diag(wv))[_sort])).T
#for i in range(wu.size):
# s,t = confidence_intervals[i]
# v = true_w[i]
# assert ((s<v)&(v<t)), "didnt find true w within the 95% confidence interval of the predicted values"
np.testing.assert_allclose(np.sort(wu.flat), np.sort(true_w), rtol=1e-4)
np.testing.assert_allclose(np.diag(wv), 0, atol=1e-4)
np.testing.assert_allclose(np.sort(m.kern.slope.flat), np.sort(true_slope), rtol=1e-4)
class GradientTests(np.testing.TestCase):
def setUp(self):
@ -730,6 +835,7 @@ class GradientTests(np.testing.TestCase):
self.assertTrue( np.allclose(var1, var2) )
def test_gp_VGPC(self):
np.random.seed(10)
num_obs = 25
X = np.random.randint(0, 140, num_obs)
X = X[:, None]
@ -737,19 +843,22 @@ class GradientTests(np.testing.TestCase):
kern = GPy.kern.Bias(1) + GPy.kern.RBF(1)
lik = GPy.likelihoods.Gaussian()
m = GPy.models.GPVariationalGaussianApproximation(X, Y, kernel=kern, likelihood=lik)
m.randomize()
self.assertTrue(m.checkgrad())
def test_ssgplvm(self):
from GPy import kern
from GPy.models import SSGPLVM
from GPy.examples.dimensionality_reduction import _simulate_matern
np.random.seed(10)
D1, D2, D3, N, num_inducing, Q = 13, 5, 8, 45, 3, 9
_, _, Ylist = _simulate_matern(D1, D2, D3, N, num_inducing, False)
Y = Ylist[0]
k = kern.Linear(Q, ARD=True) # + kern.white(Q, _np.exp(-2)) # + kern.bias(Q)
# k = kern.RBF(Q, ARD=True, lengthscale=10.)
m = SSGPLVM(Y, Q, init="rand", num_inducing=num_inducing, kernel=k, group_spike=True)
m.randomize()
self.assertTrue(m.checkgrad())
if __name__ == "__main__":

View file

@ -33,12 +33,18 @@
# SKIPPING PLOTTING BECAUSE IT BEHAVES DIFFERENTLY ON DIFFERENT
# SYSTEMS, AND WILL MISBEHAVE
from nose import SkipTest
raise SkipTest("Skipping Matplotlib testing")
#raise SkipTest("Skipping Matplotlib testing")
#===============================================================================
import matplotlib
try:
import matplotlib
matplotlib.use('agg')
except ImportError:
# matplotlib not installed
from nose import SkipTest
raise SkipTest("Skipping Matplotlib testing")
from unittest.case import TestCase
matplotlib.use('agg')
import numpy as np
import GPy, os
@ -66,14 +72,15 @@ try:
except ImportError:
raise SkipTest("Matplotlib not installed, not testing plots")
extensions = ['png']
extensions = ['npz']
basedir = os.path.dirname(os.path.relpath(os.path.abspath(__file__)))
def _image_directories():
"""
Compute the baseline and result image directories for testing *func*.
Create the result directory if it doesn't exist.
"""
basedir = os.path.dirname(os.path.relpath(os.path.abspath(__file__)))
#module_name = __init__.__module__
#mods = module_name.split('.')
#basedir = os.path.join(*mods)
@ -83,42 +90,117 @@ def _image_directories():
cbook.mkdirs(result_dir)
return baseline_dir, result_dir
baseline_dir, result_dir = _image_directories()
if not os.path.exists(baseline_dir):
raise SkipTest("Not installed from source, baseline not available. Install from source to test plotting")
def _sequenceEqual(a, b):
assert len(a) == len(b), "Sequences not same length"
for i, [x, y], in enumerate(zip(a, b)):
assert x == y, "element not matching {}".format(i)
def _image_comparison(baseline_images, extensions=['pdf','svg','png'], tol=11, rtol=1e-3, **kwargs):
def _notFound(path):
raise IOError('File {} not in baseline')
def _image_comparison(baseline_images, extensions=['pdf','svg','png'], tol=11):
baseline_dir, result_dir = _image_directories()
for num, base in zip(plt.get_fignums(), baseline_images):
for ext in extensions:
fig = plt.figure(num)
fig.axes[0].set_axis_off()
fig.set_frameon(False)
fig.canvas.draw()
fig.savefig(os.path.join(result_dir, "{}.{}".format(base, ext)), transparent=True, edgecolor='none', facecolor='none', bbox='tight')
#fig.axes[0].set_axis_off()
#fig.set_frameon(False)
if ext in ['npz']:
figdict = flatten_axis(fig)
np.savez_compressed(os.path.join(result_dir, "{}.{}".format(base, ext)), **figdict)
fig.savefig(os.path.join(result_dir, "{}.{}".format(base, 'png')),
transparent=True,
edgecolor='none',
facecolor='none',
#bbox='tight'
)
else:
fig.savefig(os.path.join(result_dir, "{}.{}".format(base, ext)),
transparent=True,
edgecolor='none',
facecolor='none',
#bbox='tight'
)
for num, base in zip(plt.get_fignums(), baseline_images):
for ext in extensions:
#plt.close(num)
actual = os.path.join(result_dir, "{}.{}".format(base, ext))
expected = os.path.join(baseline_dir, "{}.{}".format(base, ext))
def do_test():
err = compare_images(expected, actual, tol, in_decorator=True)
if err:
raise ImageComparisonFailure("Error between {} and {} is {:.5f}, which is bigger then the tolerance of {:.5f}".format(actual, expected, err['rms'], tol))
if ext == 'npz':
def do_test():
if not os.path.exists(expected):
import shutil
shutil.copy2(actual, expected)
#shutil.copy2(os.path.join(result_dir, "{}.{}".format(base, 'png')), os.path.join(baseline_dir, "{}.{}".format(base, 'png')))
raise IOError("Baseline file {} not found, copying result {}".format(expected, actual))
else:
exp_dict = dict(np.load(expected).items())
act_dict = dict(np.load(actual).items())
for name in act_dict:
if name in exp_dict:
try:
np.testing.assert_allclose(exp_dict[name], act_dict[name], err_msg="Mismatch in {}.{}".format(base, name), rtol=rtol, **kwargs)
except AssertionError as e:
raise SkipTest(e)
else:
def do_test():
err = compare_images(expected, actual, tol, in_decorator=True)
if err:
raise SkipTest("Error between {} and {} is {:.5f}, which is bigger then the tolerance of {:.5f}".format(actual, expected, err['rms'], tol))
yield do_test
plt.close('all')
def flatten_axis(ax, prevname=''):
import inspect
members = inspect.getmembers(ax)
arrays = {}
def _flatten(l, pre):
arr = {}
if isinstance(l, np.ndarray):
if l.size:
arr[pre] = np.asarray(l)
elif isinstance(l, dict):
for _n in l:
_tmp = _flatten(l, pre+"."+_n+".")
for _nt in _tmp.keys():
arrays[_nt] = _tmp[_nt]
elif isinstance(l, list) and len(l)>0:
for i in range(len(l)):
_tmp = _flatten(l[i], pre+"[{}]".format(i))
for _n in _tmp:
arr["{}".format(_n)] = _tmp[_n]
else:
return flatten_axis(l, pre+'.')
return arr
for name, l in members:
if isinstance(l, np.ndarray):
arrays[prevname+name] = np.asarray(l)
elif isinstance(l, list) and len(l)>0:
for i in range(len(l)):
_tmp = _flatten(l[i], prevname+name+"[{}]".format(i))
for _n in _tmp:
arrays["{}".format(_n)] = _tmp[_n]
return arrays
def _a(x,y,decimal):
np.testing.assert_array_almost_equal(x, y, decimal)
def compare_axis_dicts(x, y, decimal=6):
try:
assert(len(x)==len(y))
for name in x:
_a(x[name], y[name], decimal)
except AssertionError as e:
raise SkipTest(e.message)
def test_figure():
np.random.seed(1239847)
from GPy.plotting import plotting_library as pl
#import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
import warnings
with warnings.catch_warnings():
@ -162,7 +244,7 @@ def test_kernel():
np.random.seed(1239847)
#import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
import warnings
with warnings.catch_warnings():
@ -174,7 +256,7 @@ def test_kernel():
k2.plot_ARD(['rbf', 'linear', 'bias'], legend=True)
k2.plot_covariance(visible_dims=[0, 3], plot_limits=(-1,3))
k2.plot_covariance(visible_dims=[2], plot_limits=(-1, 3))
k2.plot_covariance(visible_dims=[2, 4], plot_limits=((-1, 0), (5, 3)), projection='3d')
k2.plot_covariance(visible_dims=[2, 4], plot_limits=((-1, 0), (5, 3)), projection='3d', rstride=10, cstride=10)
k2.plot_covariance(visible_dims=[1, 4])
for do_test in _image_comparison(
baseline_images=['kern_{}'.format(sub) for sub in ["ARD", 'cov_2d', 'cov_1d', 'cov_3d', 'cov_no_lim']],
@ -185,7 +267,7 @@ def test_plot():
np.random.seed(111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
import warnings
with warnings.catch_warnings():
@ -212,7 +294,7 @@ def test_twod():
np.random.seed(11111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
X = np.random.uniform(-2, 2, (40, 2))
f = .2 * np.sin(1.3*X[:,[0]]) + 1.3*np.cos(2*X[:,[1]])
@ -221,7 +303,7 @@ def test_twod():
#m.optimize()
m.plot_data()
m.plot_mean()
m.plot_inducing()
m.plot_inducing(legend=False, marker='s')
#m.plot_errorbars_trainset()
m.plot_data_error()
for do_test in _image_comparison(baseline_images=['gp_2d_{}'.format(sub) for sub in ["data", "mean",
@ -235,7 +317,7 @@ def test_threed():
np.random.seed(11111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
X = np.random.uniform(-2, 2, (40, 2))
f = .2 * np.sin(1.3*X[:,[0]]) + 1.3*np.cos(2*X[:,[1]])
@ -247,7 +329,7 @@ def test_threed():
m.plot_samples(projection='3d', plot_raw=False, samples=1)
plt.close('all')
m.plot_data(projection='3d')
m.plot_mean(projection='3d')
m.plot_mean(projection='3d', rstride=10, cstride=10)
m.plot_inducing(projection='3d')
#m.plot_errorbars_trainset(projection='3d')
for do_test in _image_comparison(baseline_images=['gp_3d_{}'.format(sub) for sub in ["data", "mean", 'inducing',
@ -260,7 +342,7 @@ def test_sparse():
np.random.seed(11111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
X = np.random.uniform(-2, 2, (40, 1))
f = .2 * np.sin(1.3*X) + 1.3*np.cos(2*X)
@ -268,7 +350,9 @@ def test_sparse():
m = GPy.models.SparseGPRegression(X, Y, X_variance=np.ones_like(X)*0.1)
#m.optimize()
#m.plot_inducing()
m.plot_data()
_, ax = plt.subplots()
m.plot_data(ax=ax)
m.plot_data_error(ax=ax)
for do_test in _image_comparison(baseline_images=['sparse_gp_{}'.format(sub) for sub in ['data_error']], extensions=extensions):
yield (do_test, )
@ -276,7 +360,7 @@ def test_classification():
np.random.seed(11111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
X = np.random.uniform(-2, 2, (40, 1))
f = .2 * np.sin(1.3*X) + 1.3*np.cos(2*X)
@ -300,7 +384,7 @@ def test_sparse_classification():
np.random.seed(11111)
import matplotlib
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
X = np.random.uniform(-2, 2, (40, 1))
f = .2 * np.sin(1.3*X) + 1.3*np.cos(2*X)
@ -312,35 +396,43 @@ def test_sparse_classification():
m.plot(plot_raw=True, apply_link=False, samples=3)
np.random.seed(111)
m.plot(plot_raw=True, apply_link=True, samples=3)
for do_test in _image_comparison(baseline_images=['sparse_gp_class_{}'.format(sub) for sub in ["likelihood", "raw", 'raw_link']], extensions=extensions):
for do_test in _image_comparison(baseline_images=['sparse_gp_class_{}'.format(sub) for sub in ["likelihood", "raw", 'raw_link']], extensions=extensions, rtol=2):
yield (do_test, )
def test_gplvm():
from ..examples.dimensionality_reduction import _simulate_matern
from ..kern import RBF
from ..models import GPLVM
from GPy.models import GPLVM
np.random.seed(12345)
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
Q = 3
#Q = 3
# Define dataset
N = 10
k1 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,10,10,0.1,0.1]), ARD=True)
k2 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,0.1,10,0.1,10]), ARD=True)
k3 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[0.1,0.1,10,10,10]), ARD=True)
X = np.random.normal(0, 1, (N, 5))
A = np.random.multivariate_normal(np.zeros(N), k1.K(X), Q).T
B = np.random.multivariate_normal(np.zeros(N), k2.K(X), Q).T
C = np.random.multivariate_normal(np.zeros(N), k3.K(X), Q).T
#N = 60
#k1 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,10,10,0.1,0.1]), ARD=True)
#k2 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,0.1,10,0.1,10]), ARD=True)
#k3 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[0.1,0.1,10,10,10]), ARD=True)
#X = np.random.normal(0, 1, (N, 5))
#A = np.random.multivariate_normal(np.zeros(N), k1.K(X), Q).T
#B = np.random.multivariate_normal(np.zeros(N), k2.K(X), Q).T
#C = np.random.multivariate_normal(np.zeros(N), k3.K(X), Q).T
#Y = np.vstack((A,B,C))
#labels = np.hstack((np.zeros(A.shape[0]), np.ones(B.shape[0]), np.ones(C.shape[0])*2))
Y = np.vstack((A,B,C))
labels = np.hstack((np.zeros(A.shape[0]), np.ones(B.shape[0]), np.ones(C.shape[0])*2))
#k = RBF(Q, ARD=True, lengthscale=2) # + kern.white(Q, _np.exp(-2)) # + kern.bias(Q)
pars = np.load(os.path.join(basedir, 'b-gplvm-save.npz'))
Y = pars['Y']
Q = pars['Q']
labels = pars['labels']
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always') # always print
m = GPLVM(Y, Q, initialize=False)
m.update_model(False)
m.initialize_parameter()
m[:] = pars['gplvm_p']
m.update_model(True)
k = RBF(Q, ARD=True, lengthscale=2) # + kern.white(Q, _np.exp(-2)) # + kern.bias(Q)
m = GPLVM(Y, Q, init="PCA", kernel=k)
m.kern.lengthscale[:] = [1./.3, 1./.1, 1./.7]
m.likelihood.variance = .001
#m.optimize(messages=0)
np.random.seed(111)
m.plot_latent(labels=labels)
@ -355,31 +447,40 @@ def test_gplvm():
yield (do_test, )
def test_bayesian_gplvm():
from ..examples.dimensionality_reduction import _simulate_matern
from ..kern import RBF
from ..models import BayesianGPLVM
np.random.seed(12345)
matplotlib.rcParams.update(matplotlib.rcParamsDefault)
matplotlib.rcParams[u'figure.figsize'] = (4,3)
#matplotlib.rcParams[u'figure.figsize'] = (4,3)
matplotlib.rcParams[u'text.usetex'] = False
Q = 3
#Q = 3
# Define dataset
N = 10
k1 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,10,10,0.1,0.1]), ARD=True)
k2 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,0.1,10,0.1,10]), ARD=True)
k3 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[0.1,0.1,10,10,10]), ARD=True)
X = np.random.normal(0, 1, (N, 5))
A = np.random.multivariate_normal(np.zeros(N), k1.K(X), Q).T
B = np.random.multivariate_normal(np.zeros(N), k2.K(X), Q).T
C = np.random.multivariate_normal(np.zeros(N), k3.K(X), Q).T
#N = 10
#k1 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,10,10,0.1,0.1]), ARD=True)
#k2 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[10,0.1,10,0.1,10]), ARD=True)
#k3 = GPy.kern.RBF(5, variance=1, lengthscale=1./np.random.dirichlet(np.r_[0.1,0.1,10,10,10]), ARD=True)
#X = np.random.normal(0, 1, (N, 5))
#A = np.random.multivariate_normal(np.zeros(N), k1.K(X), Q).T
#B = np.random.multivariate_normal(np.zeros(N), k2.K(X), Q).T
#C = np.random.multivariate_normal(np.zeros(N), k3.K(X), Q).T
Y = np.vstack((A,B,C))
labels = np.hstack((np.zeros(A.shape[0]), np.ones(B.shape[0]), np.ones(C.shape[0])*2))
#Y = np.vstack((A,B,C))
#labels = np.hstack((np.zeros(A.shape[0]), np.ones(B.shape[0]), np.ones(C.shape[0])*2))
#k = RBF(Q, ARD=True, lengthscale=2) # + kern.white(Q, _np.exp(-2)) # + kern.bias(Q)
pars = np.load(os.path.join(basedir, 'b-gplvm-save.npz'))
Y = pars['Y']
Q = pars['Q']
labels = pars['labels']
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always') # always print
m = BayesianGPLVM(Y, Q, initialize=False)
m.update_model(False)
m.initialize_parameter()
m[:] = pars['bgplvm_p']
m.update_model(True)
k = RBF(Q, ARD=True, lengthscale=2) # + kern.white(Q, _np.exp(-2)) # + kern.bias(Q)
m = BayesianGPLVM(Y, Q, init="PCA", kernel=k)
m.kern.lengthscale[:] = [1./.3, 1./.1, 1./.7]
m.likelihood.variance = .001
#m.optimize(messages=0)
np.random.seed(111)
m.plot_inducing(projection='2d')
@ -398,4 +499,4 @@ def test_bayesian_gplvm():
if __name__ == '__main__':
import nose
nose.main()
nose.main(defaultTest='./plotting_tests.py')

View file

@ -6,6 +6,29 @@ import numpy as np
import GPy
class PriorTests(unittest.TestCase):
def test_studentT(self):
xmin, xmax = 1, 2.5*np.pi
b, C, SNR = 1, 0, 0.1
X = np.linspace(xmin, xmax, 500)
y = b*X + C + 1*np.sin(X)
y += 0.05*np.random.randn(len(X))
X, y = X[:, None], y[:, None]
studentT = GPy.priors.StudentT(1, 2, 4)
m = GPy.models.SparseGPRegression(X, y)
m.Z.set_prior(studentT)
# setting a StudentT prior on non-negative parameters
# should raise an assertionerror.
self.assertRaises(AssertionError, m.rbf.set_prior, studentT)
# The gradients need to be checked
self.assertTrue(m.checkgrad())
# Check the singleton pattern:
self.assertIs(studentT, GPy.priors.StudentT(1,2,4))
self.assertIsNot(studentT, GPy.priors.StudentT(2,2,4))
def test_lognormal(self):
xmin, xmax = 1, 2.5*np.pi
b, C, SNR = 1, 0, 0.1
@ -74,7 +97,7 @@ class PriorTests(unittest.TestCase):
# setting a Gaussian prior on non-negative parameters
# should raise an assertionerror.
#self.assertRaises(AssertionError, m.Z.set_prior, gaussian)
self.assertTrue(m.checkgrad())
def test_fixed_domain_check(self):
@ -107,8 +130,6 @@ class PriorTests(unittest.TestCase):
# should raise an assertionerror.
self.assertRaises(AssertionError, m.rbf.set_prior, gaussian)
if __name__ == "__main__":
print("Running unit tests, please be (very) patient...")
unittest.main()

View file

@ -0,0 +1,977 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Alex Grigorevskiy
# Licensed under the BSD 3-clause license (see LICENSE.txt)
"""
Test module for state_space_main.py
"""
import unittest
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import GPy.models.state_space_setup as ss_setup
import GPy.models.state_space_main as ssm
def generate_x_points(points_num=100, x_interval = (0, 20), random=True):
"""
Function generates (sorted) points on the x axis.
Input:
---------------------------
points_num: int
How many points to generate
x_interval: tuple (a,b)
On which interval to generate points
random: bool
Regular points or random
Output:
---------------------------
x_points: np.array
Generated points
"""
x_interval = np.asarray( x_interval )
if random:
x_points = np.random.rand(points_num) * ( x_interval[1] - x_interval[0] ) + x_interval[0]
x_points = np.sort( x_points )
else:
x_points = np.linspace(x_interval[0], x_interval[1], num=points_num )
return x_points
def generate_sine_data(x_points=None, sin_period=2.0, sin_ampl=10.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Function generates sinusoidal data.
Input:
--------------------------------
x_points: np.array
Previously generated X points
sin_period: float
Sine period
sin_ampl: float
Sine amplitude
noise_var: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data
(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)
points_num: int
x_interval: tuple (a,b)
random: bool
"""
sin_function = lambda xx: sin_ampl * np.sin( 2*np.pi/sin_period * xx )
if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)
y_points = sin_function( x_points ) + np.random.randn( len(x_points) ) * np.sqrt(noise_var)
if plot:
pass
return x_points, y_points
def generate_linear_data(x_points=None, tangent=2.0, add_term=1.0, noise_var=2.0,
plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Function generates linear data.
Input:
--------------------------------
x_points: np.array
Previously generated X points
tangent: float
Factor with which independent variable is multiplied in linear equation.
add_term: float
Additive term in linear equation.
noise_var: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data
(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)
points_num: int
x_interval: tuple (a,b)
random: bool
"""
linear_function = lambda xx: tangent*xx + add_term
if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)
y_points = linear_function( x_points ) + np.random.randn( len(x_points) ) * np.sqrt(noise_var)
if plot:
pass
return x_points, y_points
def generate_brownian_data(x_points=None, kernel_var = 2.0, noise_var = 2.0,
plot = False, points_num=100, x_interval = (0, 20), random=True):
"""
Generate brownian data - data from Brownian motion.
First point is always 0, and \Beta(0) = 0 - standard conditions for Brownian motion.
Input:
--------------------------------
x_points: np.array
Previously generated X points
variance: float
Gaussian noise variance added to the sine function
plot: bool
Whether to plot generated data
(if x_points is None, the the following parameters are used to generate
those. They are the same as in 'generate_x_points' function)
points_num: int
x_interval: tuple (a,b)
random: bool
"""
if x_points is None:
x_points = generate_x_points(points_num, x_interval, random)
if x_points[0] != 0:
x_points[0] = 0
y_points = np.zeros( (points_num,) )
for i in range(1, points_num):
noise = np.random.randn() * np.sqrt(kernel_var * (x_points[i] - x_points[i-1]))
y_points[i] = y_points[i-1] + noise
y_points += np.random.randn( len(x_points) ) * np.sqrt(noise_var)
return x_points, y_points
def generate_linear_plus_sin(x_points=None, tangent=2.0, add_term=1.0, noise_var=2.0,
sin_period=2.0, sin_ampl=10.0, plot = False,
points_num=100, x_interval = (0, 20), random=True):
"""
Generate the sum of linear trend and the sine function.
For parameters see the 'generate_linear' and 'generate_sine'.
Comment: Gaussian noise variance is added only once (for linear function).
"""
x_points, y_linear_points = generate_linear_data(x_points, tangent, add_term, noise_var,
False, points_num, x_interval, random)
x_points, y_sine_points = generate_sine_data(x_points, sin_period, sin_ampl, 0.0,
False, points_num, x_interval, random)
y_points = y_linear_points + y_sine_points
if plot:
pass
return x_points, y_points
def generate_random_y_data(samples, dim, ts_no):
"""
Generate data:
Input:
------------------
samples - how many samples
dim - dimensionality of the data
ts_no - number of time series
Output:
--------------------------
Y: np.array((samples, dim, ts_no))
"""
Y = np.empty((samples, dim, ts_no));
for i in range(0,samples):
for j in range(0,ts_no):
sample = np.random.randn(dim)
Y[i,:,j] = sample
if (Y.shape[2] == 1): # ts_no = 1
Y.shape=(Y.shape[0], Y.shape[1])
return Y
class StateSpaceKernelsTests(np.testing.TestCase):
def setUp(self):
pass
def run_descr_model(self, measurements, A,Q,H,R, true_states=None,
mean_compare_decimal=8,
m_init=None, P_init=None, dA=None,dQ=None,
dH=None,dR=None, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True):
#import pdb; pdb.set_trace()
state_dim = 1 if not isinstance(A,np.ndarray) else A.shape[0]
ts_no = 1 if (len(measurements.shape) < 3) else measurements.shape[2]
grad_params_no = None if dA is None else dA.shape[2]
ss_setup.use_cython = use_cython
global ssm
if (ssm.cython_code_available) and (ssm.use_cython != use_cython):
reload(ssm)
grad_calc_params = None
if calc_grad_log_likelihood:
grad_calc_params = {}
grad_calc_params['dA'] = dA
grad_calc_params['dQ'] = dQ
grad_calc_params['dH'] = dH
grad_calc_params['dR'] = dR
(f_mean, f_var, loglikelhood, g_loglikelhood, \
dynamic_callables_smoother) = ssm.DescreteStateSpace.kalman_filter(A, Q, H, R, measurements, index=None,
m_init=m_init, P_init=P_init, p_kalman_filter_type = kalman_filter_type,
calc_log_likelihood=calc_log_likelihood,
calc_grad_log_likelihood=calc_grad_log_likelihood,
grad_params_no=grad_params_no,
grad_calc_params=grad_calc_params)
f_mean_squeezed = np.squeeze(f_mean[1:,:]) # exclude initial value
f_var_squeezed = np.squeeze(f_var[1:,:]) # exclude initial value
if true_states is not None:
#print np.max(np.abs(f_mean_squeezed-true_states))
np.testing.assert_almost_equal(np.max(np.abs(f_mean_squeezed- \
true_states)), 0, decimal=mean_compare_decimal)
np.testing.assert_equal(f_mean.shape, (measurements.shape[0]+1,state_dim,ts_no) )
np.testing.assert_equal(f_var.shape, (measurements.shape[0]+1,state_dim,state_dim) )
(M_smooth, P_smooth) = ssm.DescreteStateSpace.rts_smoother(state_dim, dynamic_callables_smoother, f_mean,
f_var)
return f_mean, f_var
def run_continuous_model(self, F, L, Qc, p_H, p_R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=None, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=0, grad_calc_params=None):
#import pdb; pdb.set_trace()
state_dim = 1 if not isinstance(F,np.ndarray) else F.shape[0]
ts_no = 1 if (len(Y_data.shape) < 3) else Y_data.shape[2]
ss_setup.use_cython = use_cython
global ssm
if (ssm.cython_code_available) and (ssm.use_cython != use_cython):
reload(ssm)
(f_mean, f_var, loglikelhood, g_loglikelhood, \
dynamic_callables_smoother) = ssm.ContDescrStateSpace.cont_discr_kalman_filter(F, L, Qc, p_H, p_R,
P_inf, X_data, Y_data, index = None,
m_init=None, P_init=None,
p_kalman_filter_type='regular',
calc_log_likelihood=False,
calc_grad_log_likelihood=False,
grad_params_no=0, grad_calc_params=grad_calc_params)
f_mean_squeezed = np.squeeze(f_mean[1:,:]) # exclude initial value
f_var_squeezed = np.squeeze(f_var[1:,:]) # exclude initial value
np.testing.assert_equal(f_mean.shape, (Y_data.shape[0]+1,state_dim,ts_no))
np.testing.assert_equal(f_var.shape, (Y_data.shape[0]+1,state_dim,state_dim))
(M_smooth, P_smooth) = ssm.ContDescrStateSpace.cont_discr_rts_smoother(state_dim, f_mean, \
f_var,dynamic_callables_smoother)
return f_mean, f_var
def test_discrete_ss_first(self,plot=False):
"""
Tests discrete State-Space model - first test.
"""
np.random.seed(235) # seed the random number generator
A = 1.0 # For cython code to run properly need float input
H = 1.0
Q = 1.0
R = 1.0
steps_num = 100
# generate data ->
true_states = np.zeros((steps_num,))
init_state = 0
measurements = np.zeros((steps_num,))
for s in range(0, steps_num):
if s== 0:
true_states[0] = init_state + np.sqrt(Q)*np.random.randn()
else:
true_states[s] = true_states[s-1] + np.sqrt(R)*np.random.randn()
measurements[s] = true_states[s] + np.sqrt(R)*np.random.randn()
# generate data <-
# descrete kalman filter ->
m_init = 0; P_init = 1
d_num = 1000
state_discr = np.linspace(-10,10,d_num)
state_trans_matrix = np.empty((d_num,d_num))
for i in range(d_num):
state_trans_matrix[:,i] = norm.pdf(state_discr, loc=A*state_discr[i], scale=np.sqrt(Q))
m_prev = norm.pdf(state_discr, loc = m_init, scale = np.sqrt(P_init)); #m_prev / np.sum(m_prev)
m = np.zeros((d_num, steps_num))
i_mean = np.zeros((steps_num,))
for s in range(0, steps_num):
# Prediction step:
if (s==0):
m[:,s] = np.dot(state_trans_matrix, m_prev)
else:
m[:,s] = np.dot(state_trans_matrix, m[:,s-1])
# Update step:
#meas_ind = np.argmin(np.abs(state_discr - measurements[s])
y_vec = np.zeros( (d_num,))
for i in range(d_num):
y_vec[i] = norm.pdf(measurements[s], loc=H*state_discr[i], scale=np.sqrt(R))
norm_const = np.dot( y_vec, m[:,s] )
m[:,s] = y_vec * m[:,s] / norm_const
i_mean[s] = state_discr[ np.argmax(m[:,s]) ]
# descrete kalman filter <-
(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=False)
(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=False)
(f_mean, f_var) = self.run_descr_model(measurements, A,Q,H,R, true_states=i_mean,
mean_compare_decimal=1,
m_init=m_init, P_init=P_init,use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=False)
if plot:
# plotting ->
plt.figure()
plt.plot( true_states, 'g.-',label='true states')
#plt.plot( measurements, 'b.-', label='measurements')
plt.plot( f_mean, 'r.-',label='Kalman filter estimates')
plt.plot( i_mean, 'k.-', label='Discretization')
plt.plot( f_mean + 2*np.sqrt(f_var), 'r.--')
plt.plot( f_mean - 2*np.sqrt(f_var), 'r.--')
plt.legend()
plt.show()
# plotting <-
return None
def test_discrete_ss_1D(self,plot=False):
"""
This function tests Kalman filter and smoothing when the state
dimensionality is one dimensional.
"""
np.random.seed(234) # seed the random number generator
# 1D ss model
state_dim = 1;
param_num = 2 # sigma_Q, sigma_R - parameters
measurement_dim = 1 # dimensionality od measurement
A = 1.0
Q = 2.0
dA= np.zeros((state_dim,state_dim,param_num))
dQ = np.zeros((state_dim,state_dim,param_num)); dQ[0,0,0] = 1.0
# measurement related parameters (subject to change) ->
H = np.ones((measurement_dim,state_dim ))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <-
# 1D measurement, 1 ts_no ->
data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:]+H*f_var[1:]*H), 'b--')
plt.plot( np.squeeze(f_mean[1:]-H*f_var[1:]*H), 'b--')
# plt.plot( np.squeeze(M_sm[1:]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:]+H*P_sm[1:]*H), 'r--')
# plt.plot( np.squeeze(M_sm[1:]-H*P_sm[1:]*H), 'r--')
plt.legend()
plt.title("1D state-space, 1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurement, 1 ts_no <-
# 1D measurement, 3 ts_no ->
data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
#import pdb; pdb.set_trace()
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,:,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.squeeze(H*f_var[1:]*H), 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.squeeze(H*f_var[1:]*H), 'b--')
# plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,0,1])+H*np.squeeze(P_sm[1:])*H, 'r--')
# plt.plot( np.squeeze(M_sm[1:,0,1])-H*np.squeeze(P_sm[1:])*H, 'r--')
plt.legend()
plt.title("1D state-space, 1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurement, 3 ts_no <-
measurement_dim = 2 # dimensionality of measurement
H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <
data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
# (f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
# mean_compare_decimal=16,
# m_init=None, P_init=None, dA=dA,dQ=dQ,
# dH=dH,dR=dR, use_cython=True,
# kalman_filter_type='svd',
# calc_log_likelihood=True,
# calc_grad_log_likelihood=True)
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,0,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D state-space, 2D measurements, 3 ts_no. 1-st measurement, 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurement, 3 ts_no <-
def test_discrete_ss_2D(self,plot=False):
"""
This function tests Kalman filter and smoothing when the state
dimensionality is two dimensional.
"""
np.random.seed(234) # seed the random number generator
# 1D ss model
state_dim = 2;
param_num = 3 # sigma_Q, sigma_R, one parameters in A - parameters
measurement_dim = 1 # dimensionality od measurement
A = np.eye(state_dim); A[0,0] = 0.5
Q = np.ones((state_dim,state_dim));
dA = np.zeros((state_dim,state_dim,param_num)); dA[1,1,2] = 1
dQ = np.zeros((state_dim,state_dim,param_num)); dQ[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) ->
H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <-
# 1D measurement, 1 ts_no ->
data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,0]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,0])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,0])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurement, 1 ts_no <-
# 1D measurement, 3 ts_no ->
data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=True,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,:,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurement, 3 ts_no <-
# 2D measurement, 3 ts_no ->
# measurement related parameters (subject to change) ->
measurement_dim = 2 # dimensionality od measurement
H = np.ones((measurement_dim,state_dim))
R = 0.5 * np.eye(measurement_dim)
dH = np.zeros((measurement_dim,state_dim,param_num))
dR = np.zeros((measurement_dim,measurement_dim,param_num)); dR[:,:,1] = np.eye(measurement_dim)
# measurement related parameters (subject to change) <
data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
(f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
mean_compare_decimal=16,
m_init=None, P_init=None, dA=dA,dQ=dQ,
dH=dH,dR=dR, use_cython=False,
kalman_filter_type='svd',
calc_log_likelihood=True,
calc_grad_log_likelihood=True)
# (f_mean, f_var) = self.run_descr_model(data, A,Q,H,R, true_states=None,
# mean_compare_decimal=16,
# m_init=None, P_init=None, dA=dA,dQ=dQ,
# dH=dH,dR=dR, use_cython=True,
# kalman_filter_type='svd',
# calc_log_likelihood=True,
# calc_grad_log_likelihood=True)
if plot:
# plotting ->
plt.figure()
plt.plot( np.squeeze(data[:,0,1]), 'g.-', label='measurements')
plt.plot( np.squeeze(f_mean[1:,0,1]), 'b.-',label='Kalman filter estimates')
plt.plot( np.squeeze(f_mean[1:,0,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( np.squeeze(f_mean[1:,0,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,0,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,0,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,0,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("2D state-space, 2D measurements, 3 ts_no. 1-st measurement, 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurement, 3 ts_no <-
def test_continuos_ss(self,plot=False):
"""
This function tests the continuos state-space model.
"""
# 1D measurements, 1 ts_no ->
measurement_dim = 1 # dimensionality of measurement
X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))
try:
import GPy
except ImportError as e:
return None
periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()
state_dim = dFt.shape[0];
param_num = dFt.shape[2]
grad_calc_params = {}
grad_calc_params['dP_inf'] = dP_inft
grad_calc_params['dF'] = dFt
grad_calc_params['dQc'] = dQct
grad_calc_params['dR'] = np.zeros((measurement_dim,measurement_dim,param_num))
grad_calc_params['dP_init'] = dP0
# dH matrix is None
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=True,
kalman_filter_type='rbc',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
if plot:
# plotting ->
plt.figure()
plt.plot( X_data, np.squeeze(Y_data[:,0]), 'g.-', label='measurements')
plt.plot( X_data, np.squeeze(f_mean[1:,15]), 'b.-',label='Kalman filter estimates')
plt.plot( X_data, np.squeeze(f_mean[1:,15])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot( X_data, np.squeeze(f_mean[1:,15])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,15]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,15])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,15])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 1 ts_no")
plt.show()
# plotting <-
# 1D measurements, 1 ts_no <-
# 1D measurements, 3 ts_no ->
measurement_dim = 1 # dimensionality od measurement
X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 1, 3) # np.array((samples, dim, ts_no))
periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()
state_dim = dFt.shape[0];
param_num = dFt.shape[2]
grad_calc_params = {}
grad_calc_params['dP_inf'] = dP_inft
grad_calc_params['dF'] = dFt
grad_calc_params['dQc'] = dQct
grad_calc_params['dR'] = np.zeros((measurement_dim,measurement_dim,param_num))
grad_calc_params['dP_init'] = dP0
# dH matrix is None
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, 1.5, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=True,
kalman_filter_type='rbc',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
if plot:
# plotting ->
plt.figure()
plt.plot(X_data, np.squeeze(Y_data[:,0,1]), 'g.-', label='measurements')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1]), 'b.-',label='Kalman filter estimates')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,15,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,15,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,15,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 1D measurements, 3 ts_no <-
# 2D measurements, 3 ts_no ->
measurement_dim = 2 # dimensionality od measurement
X_data = generate_x_points(points_num=10, x_interval = (0, 20), random=True)
Y_data = generate_random_y_data(10, 2, 3) # np.array((samples, dim, ts_no))
periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
(F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0) = periodic_kernel.sde()
H = np.vstack((H,H)) # make 2D measurements
R = 1.5 * np.eye(measurement_dim)
state_dim = dFt.shape[0];
param_num = dFt.shape[2]
grad_calc_params = {}
grad_calc_params['dP_inf'] = dP_inft
grad_calc_params['dF'] = dFt
grad_calc_params['dQc'] = dQct
grad_calc_params['dR'] = np.zeros((measurement_dim,measurement_dim,param_num))
grad_calc_params['dP_init'] = dP0
# dH matrix is None
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='regular',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
(f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
m_init=None, P_init=P0, use_cython=False,
kalman_filter_type='rbc',
calc_log_likelihood=True,
calc_grad_log_likelihood=True,
grad_params_no=param_num, grad_calc_params=grad_calc_params)
# (f_mean, f_var) = self.run_continuous_model(F, L, Qc, H, R, P_inf, X_data, Y_data, index = None,
# m_init=None, P_init=P0, use_cython=True,
# kalman_filter_type='rbc',
# calc_log_likelihood=True,
# calc_grad_log_likelihood=True,
# grad_params_no=param_num, grad_calc_params=grad_calc_params)
if plot:
# plotting ->
plt.figure()
plt.plot(X_data, np.squeeze(Y_data[:,0,1]), 'g.-', label='measurements')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1]), 'b.-',label='Kalman filter estimates')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])+np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
plt.plot(X_data, np.squeeze(f_mean[1:,15,1])-np.einsum('ij,ajk,kl', H, f_var[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_sm[1:,15,1]), 'r.-',label='Smoother Estimates')
# plt.plot( np.squeeze(M_sm[1:,15,1])+np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
# plt.plot( np.squeeze(M_sm[1:,15,1])-np.einsum('ij,ajk,kl', H, P_sm[1:], H.T)[:,0,0], 'r--')
plt.legend()
plt.title("1D measurements, 3 ts_no. 2-nd ts ploted")
plt.show()
# plotting <-
# 2D measurements, 3 ts_no <-
#def test_EM_gradient(plot=False):
# """
# Test EM gradient calculation. This method works (the formulas are such)
# that it works only for time invariant matrices A, Q, H, R. For the continuous
# model it means that time intervals are the same.
# """
#
# np.random.seed(234) # seed the random number generator
#
# # 1D measurements, 1 ts_no ->
# measurement_dim = 1 # dimensionality of measurement
#
# x_data = generate_x_points(points_num=10, x_interval = (0, 20), random=False)
# data = generate_random_y_data(10, 1, 1) # np.array((samples, dim, ts_no))
#
# import GPy
# #periodic_kernel = GPy.kern.sde_Matern32(1,active_dims=[0,])
# periodic_kernel = GPy.kern.sde_StdPeriodic(1,active_dims=[0,])
# (F,L,Qc,H,P_inf,P0, dFt,dQct,dP_inft,dP0t) = periodic_kernel.sde()
#
# state_dim = dFt.shape[0];
# param_num = dFt.shape[2]
#
# grad_calc_params = {}
# grad_calc_params['dP_inf'] = dP_inft
# grad_calc_params['dF'] = dFt
# grad_calc_params['dQc'] = dQct
# grad_calc_params['dR'] = np.zeros((measurement_dim,measurement_dim,param_num))
# grad_calc_params['dP_init'] = dP0t
# # dH matrix is None
#
#
# #(F,L,Qc,H,P_inf,dF,dQc,dP_inf) = ssm.balance_ss_model(F,L,Qc,H,P_inf,dF,dQc,dP_inf)
# # Use the Kalman filter to evaluate the likelihood
#
# #import pdb; pdb.set_trace()
# (M_kf, P_kf, log_likelihood,
# grad_log_likelihood,SmootherMatrObject) = ss.ContDescrStateSpace.cont_discr_kalman_filter(F,
# L, Qc, H, 1.5, P_inf, x_data, data, m_init=None,
# P_init=P0, calc_log_likelihood=True,
# calc_grad_log_likelihood=True,
# grad_params_no=param_num,
# grad_calc_params=grad_calc_params)
#
# if plot:
# # plotting ->
# plt.figure()
# plt.plot( np.squeeze(data[:,0]), 'g.-', label='measurements')
# plt.plot( np.squeeze(M_kf[1:,15]), 'b.-',label='Kalman filter estimates')
# plt.plot( np.squeeze(M_kf[1:,15])+np.einsum('ij,ajk,kl', H, P_kf[1:], H.T)[:,0,0], 'b--')
# plt.plot( np.squeeze(M_kf[1:,15])-np.einsum('ij,ajk,kl', H, P_kf[1:], H.T)[:,0,0], 'b--')
# plt.title("1D measurements, 1 ts_no")
# plt.show()
# # plotting <-
# # 1D measurements, 1 ts_no <-
if __name__ == '__main__':
print("Running state-space inference tests...")
unittest.main()
#tt = StateSpaceKernelsTests('test_discrete_ss_first')
#res = tt.test_discrete_ss_first(plot=True)
#res = tt.test_discrete_ss_1D(plot=True)
#res = tt.test_discrete_ss_2D(plot=False)
#res = tt.test_continuos_ss(plot=True)

View file

@ -29,6 +29,7 @@
#===============================================================================
import unittest, numpy as np
import GPy
class TestDebug(unittest.TestCase):
def test_checkFinite(self):
@ -96,3 +97,61 @@ class TestDebug(unittest.TestCase):
self.assertTrue((2, np.median(X.mean.values[:,2])) in fixed)
self.assertTrue(len([t for t in fixed if t[0] == 1]) == 0) # Unfixed input should not be in fixed
def test_subarray(self):
import GPy
X = np.zeros((3,6), dtype=bool)
X[[1,1,1],[0,4,5]] = 1
X[1:,[2,3]] = 1
d = GPy.util.subarray_and_sorting.common_subarrays(X,axis=1)
self.assertTrue(len(d) == 3)
X[:, d[tuple(X[:,0])]]
self.assertTrue(d[tuple(X[:,4])] == d[tuple(X[:,0])] == [0, 4, 5])
self.assertTrue(d[tuple(X[:,1])] == [1])
def test_offset_cluster(self):
#Tests the GPy.util.cluster_with_offset.cluster utility with a small
#test data set. Not using random noise just in case it occasionally
#causes it not to cluster correctly.
#groundtruth cluster identifiers are: [0,1,1,0]
#data contains a list of the four sets of time series (3 per data point)
data = [np.array([[ 2.18094245, 1.96529789, 2.00265523, 2.18218742, 2.06795428],
[ 1.62254829, 1.75748448, 1.83879347, 1.87531326, 1.52503496],
[ 1.54589609, 1.61607914, 2.00463192, 1.48771394, 1.63339218]]),
np.array([[ 2.86766106, 2.97953437, 2.91958876, 2.92510506, 3.03239241],
[ 2.57368423, 2.59954886, 3.10000395, 2.75806125, 2.89865704],
[ 2.58916318, 2.53698259, 2.63858411, 2.63102504, 2.51853901]]),
np.array([[ 2.77834168, 2.9618564 , 2.88482141, 3.24259745, 2.9716821 ],
[ 2.60675576, 2.67095624, 2.94824436, 2.80520631, 2.87247516],
[ 2.49543562, 2.5492281 , 2.6505866 , 2.65015308, 2.59738616]]),
np.array([[ 1.76783086, 2.21666738, 2.07939706, 1.9268263 , 2.23360121],
[ 1.94305547, 1.94648592, 2.1278921 , 2.09481457, 2.08575238],
[ 1.69336013, 1.72285186, 1.6339506 , 1.61212022, 1.39198698]])]
#inputs contains their associated X values
inputs = [np.array([[ 0. ],
[ 0.68040097],
[ 1.20316795],
[ 1.798749 ],
[ 2.14891733]]), np.array([[ 0. ],
[ 0.51910637],
[ 0.98259352],
[ 1.57442965],
[ 1.82515098]]), np.array([[ 0. ],
[ 0.66645478],
[ 1.59464591],
[ 1.69769551],
[ 1.80932752]]), np.array([[ 0. ],
[ 0.87512108],
[ 1.71881079],
[ 2.67162871],
[ 3.23761907]])]
#try doing the clustering
active = GPy.util.cluster_with_offset.cluster(data,inputs)
#check to see that the clustering has correctly clustered the time series.
clusters = set([frozenset(cluster) for cluster in active])
assert set([1,2]) in clusters, "Offset Clustering algorithm failed"
assert set([0,3]) in clusters, "Offset Clustering algoirthm failed"