2013-04-23 13:44:31 +01:00
|
|
|
'''
|
|
|
|
|
Created on 22 Apr 2013
|
|
|
|
|
|
|
|
|
|
@author: maxz
|
|
|
|
|
'''
|
|
|
|
|
import unittest
|
|
|
|
|
import numpy
|
|
|
|
|
|
|
|
|
|
import GPy
|
|
|
|
|
import itertools
|
|
|
|
|
from GPy.core import model
|
|
|
|
|
|
|
|
|
|
class PsiStatModel(model):
|
2013-04-23 15:22:30 +01:00
|
|
|
def __init__(self, which, X, X_variance, Z, M, kernel):
|
2013-04-23 13:44:31 +01:00
|
|
|
self.which = which
|
|
|
|
|
self.X = X
|
|
|
|
|
self.X_variance = X_variance
|
|
|
|
|
self.Z = Z
|
2013-06-05 11:17:15 +01:00
|
|
|
self.N, self.input_dim = X.shape
|
2013-06-05 13:02:03 +01:00
|
|
|
self.num_inducing, input_dim = Z.shape
|
2013-06-05 11:17:15 +01:00
|
|
|
assert self.input_dim == input_dim, "shape missmatch: Z:{!s} X:{!s}".format(Z.shape, X.shape)
|
2013-04-23 13:44:31 +01:00
|
|
|
self.kern = kernel
|
|
|
|
|
super(PsiStatModel, self).__init__()
|
2013-04-23 15:22:30 +01:00
|
|
|
self.psi_ = self.kern.__getattribute__(self.which)(self.Z, self.X, self.X_variance)
|
2013-04-23 13:44:31 +01:00
|
|
|
def _get_param_names(self):
|
2013-06-05 11:17:15 +01:00
|
|
|
Xnames = ["{}_{}_{}".format(what, i, j) for what, i, j in itertools.product(['X', 'X_variance'], range(self.N), range(self.input_dim))]
|
2013-06-05 13:02:03 +01:00
|
|
|
Znames = ["Z_{}_{}".format(i, j) for i, j in itertools.product(range(self.num_inducing), range(self.input_dim))]
|
2013-04-23 13:44:31 +01:00
|
|
|
return Xnames + Znames + self.kern._get_param_names()
|
|
|
|
|
def _get_params(self):
|
|
|
|
|
return numpy.hstack([self.X.flatten(), self.X_variance.flatten(), self.Z.flatten(), self.kern._get_params()])
|
|
|
|
|
def _set_params(self, x, save_old=True, save_count=0):
|
|
|
|
|
start, end = 0, self.X.size
|
2013-06-05 11:17:15 +01:00
|
|
|
self.X = x[start:end].reshape(self.N, self.input_dim)
|
2013-04-23 13:44:31 +01:00
|
|
|
start, end = end, end + self.X_variance.size
|
2013-06-05 11:17:15 +01:00
|
|
|
self.X_variance = x[start: end].reshape(self.N, self.input_dim)
|
2013-04-23 13:44:31 +01:00
|
|
|
start, end = end, end + self.Z.size
|
2013-06-05 13:02:03 +01:00
|
|
|
self.Z = x[start: end].reshape(self.num_inducing, self.input_dim)
|
2013-04-23 13:44:31 +01:00
|
|
|
self.kern._set_params(x[end:])
|
|
|
|
|
def log_likelihood(self):
|
|
|
|
|
return self.kern.__getattribute__(self.which)(self.Z, self.X, self.X_variance).sum()
|
|
|
|
|
def _log_likelihood_gradients(self):
|
2013-04-23 15:22:30 +01:00
|
|
|
psimu, psiS = self.kern.__getattribute__("d" + self.which + "_dmuS")(numpy.ones_like(self.psi_), self.Z, self.X, self.X_variance)
|
2013-04-23 13:44:31 +01:00
|
|
|
try:
|
2013-04-23 15:22:30 +01:00
|
|
|
psiZ = self.kern.__getattribute__("d" + self.which + "_dZ")(numpy.ones_like(self.psi_), self.Z, self.X, self.X_variance)
|
2013-04-23 13:44:31 +01:00
|
|
|
except AttributeError:
|
2013-06-05 13:02:03 +01:00
|
|
|
psiZ = numpy.zeros(self.num_inducing * self.input_dim)
|
2013-04-23 15:22:30 +01:00
|
|
|
thetagrad = self.kern.__getattribute__("d" + self.which + "_dtheta")(numpy.ones_like(self.psi_), self.Z, self.X, self.X_variance).flatten()
|
2013-04-23 13:44:31 +01:00
|
|
|
return numpy.hstack((psimu.flatten(), psiS.flatten(), psiZ.flatten(), thetagrad))
|
|
|
|
|
|
2013-05-03 13:36:33 +01:00
|
|
|
class DPsiStatTest(unittest.TestCase):
|
2013-06-05 11:17:15 +01:00
|
|
|
input_dim = 5
|
2013-04-23 13:44:31 +01:00
|
|
|
N = 50
|
2013-06-05 13:02:03 +01:00
|
|
|
num_inducing = 10
|
|
|
|
|
input_dim = 20
|
2013-06-05 11:17:15 +01:00
|
|
|
X = numpy.random.randn(N, input_dim)
|
2013-04-23 13:44:31 +01:00
|
|
|
X_var = .5 * numpy.ones_like(X) + .4 * numpy.clip(numpy.random.randn(*X.shape), 0, 1)
|
2013-06-05 13:02:03 +01:00
|
|
|
Z = numpy.random.permutation(X)[:num_inducing]
|
|
|
|
|
Y = X.dot(numpy.random.randn(input_dim, input_dim))
|
2013-06-05 11:17:15 +01:00
|
|
|
# kernels = [GPy.kern.linear(input_dim, ARD=True, variances=numpy.random.rand(input_dim)), GPy.kern.rbf(input_dim, ARD=True), GPy.kern.bias(input_dim)]
|
2013-04-23 13:44:31 +01:00
|
|
|
|
2013-06-05 11:17:15 +01:00
|
|
|
kernels = [GPy.kern.linear(input_dim), GPy.kern.rbf(input_dim), GPy.kern.bias(input_dim),
|
|
|
|
|
GPy.kern.linear(input_dim) + GPy.kern.bias(input_dim),
|
|
|
|
|
GPy.kern.rbf(input_dim) + GPy.kern.bias(input_dim)]
|
2013-04-23 14:10:38 +01:00
|
|
|
|
2013-04-23 13:44:31 +01:00
|
|
|
def testPsi0(self):
|
2013-04-23 14:10:38 +01:00
|
|
|
for k in self.kernels:
|
|
|
|
|
m = PsiStatModel('psi0', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-05-03 13:36:33 +01:00
|
|
|
try:
|
|
|
|
|
assert m.checkgrad(), "{} x psi0".format("+".join(map(lambda x: x.name, k.parts)))
|
|
|
|
|
except:
|
|
|
|
|
import ipdb;ipdb.set_trace()
|
2013-04-23 13:44:31 +01:00
|
|
|
|
2013-04-23 15:52:43 +01:00
|
|
|
# def testPsi1(self):
|
|
|
|
|
# for k in self.kernels:
|
|
|
|
|
# m = PsiStatModel('psi1', X=self.X, X_variance=self.X_var, Z=self.Z,
|
|
|
|
|
# M=self.M, kernel=k)
|
|
|
|
|
# assert m.checkgrad(), "{} x psi1".format("+".join(map(lambda x: x.name, k.parts)))
|
2013-04-23 13:44:31 +01:00
|
|
|
|
2013-04-23 15:22:30 +01:00
|
|
|
def testPsi2_lin(self):
|
|
|
|
|
k = self.kernels[0]
|
|
|
|
|
m = PsiStatModel('psi2', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-04-23 15:22:30 +01:00
|
|
|
assert m.checkgrad(), "{} x psi2".format("+".join(map(lambda x: x.name, k.parts)))
|
|
|
|
|
def testPsi2_lin_bia(self):
|
|
|
|
|
k = self.kernels[3]
|
|
|
|
|
m = PsiStatModel('psi2', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-04-23 15:22:30 +01:00
|
|
|
assert m.checkgrad(), "{} x psi2".format("+".join(map(lambda x: x.name, k.parts)))
|
|
|
|
|
def testPsi2_rbf(self):
|
|
|
|
|
k = self.kernels[1]
|
|
|
|
|
m = PsiStatModel('psi2', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-04-23 15:22:30 +01:00
|
|
|
assert m.checkgrad(), "{} x psi2".format("+".join(map(lambda x: x.name, k.parts)))
|
|
|
|
|
def testPsi2_rbf_bia(self):
|
|
|
|
|
k = self.kernels[-1]
|
|
|
|
|
m = PsiStatModel('psi2', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-04-23 15:22:30 +01:00
|
|
|
assert m.checkgrad(), "{} x psi2".format("+".join(map(lambda x: x.name, k.parts)))
|
|
|
|
|
def testPsi2_bia(self):
|
|
|
|
|
k = self.kernels[2]
|
|
|
|
|
m = PsiStatModel('psi2', X=self.X, X_variance=self.X_var, Z=self.Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
M=self.num_inducing, kernel=k)
|
2013-04-23 15:22:30 +01:00
|
|
|
assert m.checkgrad(), "{} x psi2".format("+".join(map(lambda x: x.name, k.parts)))
|
2013-04-23 13:44:31 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2013-04-23 15:52:43 +01:00
|
|
|
import sys
|
|
|
|
|
interactive = 'i' in sys.argv
|
|
|
|
|
if interactive:
|
2013-06-05 13:02:03 +01:00
|
|
|
# N, num_inducing, input_dim, input_dim = 30, 5, 4, 30
|
2013-06-05 11:17:15 +01:00
|
|
|
# X = numpy.random.rand(N, input_dim)
|
|
|
|
|
# k = GPy.kern.linear(input_dim) + GPy.kern.bias(input_dim) + GPy.kern.white(input_dim, 0.00001)
|
2013-05-01 17:09:38 +01:00
|
|
|
# K = k.K(X)
|
2013-06-05 13:02:03 +01:00
|
|
|
# Y = numpy.random.multivariate_normal(numpy.zeros(N), K, input_dim).T
|
2013-05-01 17:09:38 +01:00
|
|
|
# Y -= Y.mean(axis=0)
|
2013-06-05 11:17:15 +01:00
|
|
|
# k = GPy.kern.linear(input_dim) + GPy.kern.bias(input_dim) + GPy.kern.white(input_dim, 0.00001)
|
2013-06-05 13:02:03 +01:00
|
|
|
# m = GPy.models.Bayesian_GPLVM(Y, input_dim, kernel=k, num_inducing=num_inducing)
|
2013-05-01 17:09:38 +01:00
|
|
|
# m.ensure_default_constraints()
|
|
|
|
|
# m.randomize()
|
|
|
|
|
# # self.assertTrue(m.checkgrad())
|
|
|
|
|
numpy.random.seed(0)
|
2013-06-05 11:17:15 +01:00
|
|
|
input_dim = 5
|
2013-04-23 15:52:43 +01:00
|
|
|
N = 50
|
|
|
|
|
M = 10
|
2013-05-02 16:37:47 +01:00
|
|
|
D = 15
|
2013-06-05 11:17:15 +01:00
|
|
|
X = numpy.random.randn(N, input_dim)
|
2013-05-02 16:37:47 +01:00
|
|
|
X_var = .5 * numpy.ones_like(X) + .1 * numpy.clip(numpy.random.randn(*X.shape), 0, 1)
|
2013-04-23 15:52:43 +01:00
|
|
|
Z = numpy.random.permutation(X)[:M]
|
2013-06-05 11:17:15 +01:00
|
|
|
Y = X.dot(numpy.random.randn(input_dim, D))
|
|
|
|
|
# kernel = GPy.kern.bias(input_dim)
|
2013-05-01 17:09:38 +01:00
|
|
|
#
|
2013-06-05 11:17:15 +01:00
|
|
|
# kernels = [GPy.kern.linear(input_dim), GPy.kern.rbf(input_dim), GPy.kern.bias(input_dim),
|
|
|
|
|
# GPy.kern.linear(input_dim) + GPy.kern.bias(input_dim),
|
|
|
|
|
# GPy.kern.rbf(input_dim) + GPy.kern.bias(input_dim)]
|
2013-04-23 15:52:43 +01:00
|
|
|
|
2013-04-23 16:21:41 +01:00
|
|
|
# for k in kernels:
|
|
|
|
|
# m = PsiStatModel('psi1', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=k)
|
2013-04-23 16:21:41 +01:00
|
|
|
# assert m.checkgrad(), "{} x psi1".format("+".join(map(lambda x: x.name, k.parts)))
|
2013-04-23 15:52:43 +01:00
|
|
|
#
|
|
|
|
|
# m0 = PsiStatModel('psi0', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=GPy.kern.linear(input_dim))
|
2013-04-23 15:52:43 +01:00
|
|
|
# m1 = PsiStatModel('psi1', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=kernel)
|
2013-04-23 16:21:41 +01:00
|
|
|
# m1 = PsiStatModel('psi1', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=kernel)
|
2013-05-01 17:09:38 +01:00
|
|
|
# m2 = PsiStatModel('psi2', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=GPy.kern.rbf(input_dim))
|
2013-04-23 15:52:43 +01:00
|
|
|
m3 = PsiStatModel('psi2', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 11:17:15 +01:00
|
|
|
M=M, kernel=GPy.kern.linear(input_dim, ARD=True, variances=numpy.random.rand(input_dim)))
|
2013-05-01 17:09:38 +01:00
|
|
|
m3.ensure_default_constraints()
|
2013-06-05 11:17:15 +01:00
|
|
|
# + GPy.kern.bias(input_dim))
|
2013-05-01 17:09:38 +01:00
|
|
|
# m4 = PsiStatModel('psi2', X=X, X_variance=X_var, Z=Z,
|
2013-06-05 13:02:03 +01:00
|
|
|
# num_inducing=num_inducing, kernel=GPy.kern.rbf(input_dim) + GPy.kern.bias(input_dim))
|
2013-04-23 15:52:43 +01:00
|
|
|
else:
|
|
|
|
|
unittest.main()
|