GPy/GPy/models/bayesian_gplvm.py

251 lines
9.4 KiB
Python
Raw Normal View History

2013-06-05 14:11:49 +01:00
# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
import numpy as np
from ..core import SparseGP
from ..likelihoods import Gaussian
from .. import kern
import itertools
from matplotlib.colors import colorConverter
from GPy.inference.optimization import SCG
from GPy.util import plot_latent
from GPy.models.gplvm import GPLVM
class BayesianGPLVM(SparseGP, GPLVM):
"""
Bayesian Gaussian Process Latent Variable Model
:param Y: observed data (np.ndarray) or GPy.likelihood
:type Y: np.ndarray| GPy.likelihood instance
:param input_dim: latent dimensionality
:type input_dim: int
:param init: initialisation method for the latent space
:type init: 'PCA'|'random'
"""
2013-06-05 15:29:45 +01:00
def __init__(self, likelihood_or_Y, input_dim, X=None, X_variance=None, init='PCA', num_inducing=10,
2013-06-28 11:00:42 +01:00
Z=None, kernel=None, **kwargs):
2013-06-05 14:11:49 +01:00
if type(likelihood_or_Y) is np.ndarray:
likelihood = Gaussian(likelihood_or_Y)
else:
likelihood = likelihood_or_Y
if X == None:
X = self.initialise_latent(init, input_dim, likelihood.Y)
self.init = init
if X_variance is None:
X_variance = np.clip((np.ones_like(X) * 0.5) + .01 * np.random.randn(*X.shape), 0.001, 1)
if Z is None:
2013-06-05 15:29:45 +01:00
Z = np.random.permutation(X.copy())[:num_inducing]
2013-06-05 14:11:49 +01:00
assert Z.shape[1] == X.shape[1]
if kernel is None:
kernel = kern.rbf(input_dim) + kern.white(input_dim)
SparseGP.__init__(self, X, likelihood, kernel, Z=Z, X_variance=X_variance, **kwargs)
self.ensure_default_constraints()
2013-06-05 14:11:49 +01:00
def getstate(self):
2013-06-25 17:43:42 +01:00
"""
Get the current state of the class,
here just all the indices, rest can get recomputed
"""
return SparseGP.getstate(self) + [self.init]
2013-06-25 17:43:42 +01:00
def setstate(self, state):
2013-06-25 17:43:42 +01:00
self.init = state.pop()
SparseGP.setstate(self, state)
2013-06-05 14:11:49 +01:00
def _get_param_names(self):
2013-06-05 16:14:43 +01:00
X_names = sum([['X_%i_%i' % (n, q) for q in range(self.input_dim)] for n in range(self.num_data)], [])
S_names = sum([['X_variance_%i_%i' % (n, q) for q in range(self.input_dim)] for n in range(self.num_data)], [])
2013-06-05 14:11:49 +01:00
return (X_names + S_names + SparseGP._get_param_names(self))
def _get_params(self):
"""
Horizontally stacks the parameters in order to present them to the optimizer.
The resulting 1-input_dim array has this structure:
===============================================================
| mu | S | Z | theta | beta |
===============================================================
"""
x = np.hstack((self.X.flatten(), self.X_variance.flatten(), SparseGP._get_params(self)))
return x
def _set_params(self, x, save_old=True, save_count=0):
2013-06-25 17:43:42 +01:00
N, input_dim = self.num_data, self.input_dim
self.X = x[:self.X.size].reshape(N, input_dim).copy()
self.X_variance = x[(N * input_dim):(2 * N * input_dim)].reshape(N, input_dim).copy()
SparseGP._set_params(self, x[(2 * N * input_dim):])
2013-06-05 14:11:49 +01:00
def dKL_dmuS(self):
dKL_dS = (1. - (1. / (self.X_variance))) * 0.5
dKL_dmu = self.X
return dKL_dmu, dKL_dS
def dL_dmuS(self):
dL_dmu_psi0, dL_dS_psi0 = self.kern.dpsi0_dmuS(self.dL_dpsi0, self.Z, self.X, self.X_variance)
dL_dmu_psi1, dL_dS_psi1 = self.kern.dpsi1_dmuS(self.dL_dpsi1, self.Z, self.X, self.X_variance)
dL_dmu_psi2, dL_dS_psi2 = self.kern.dpsi2_dmuS(self.dL_dpsi2, self.Z, self.X, self.X_variance)
dL_dmu = dL_dmu_psi0 + dL_dmu_psi1 + dL_dmu_psi2
dL_dS = dL_dS_psi0 + dL_dS_psi1 + dL_dS_psi2
return dL_dmu, dL_dS
def KL_divergence(self):
var_mean = np.square(self.X).sum()
var_S = np.sum(self.X_variance - np.log(self.X_variance))
2013-06-05 16:14:43 +01:00
return 0.5 * (var_mean + var_S) - 0.5 * self.input_dim * self.num_data
2013-06-05 14:11:49 +01:00
def log_likelihood(self):
ll = SparseGP.log_likelihood(self)
kl = self.KL_divergence()
return ll - kl
def _log_likelihood_gradients(self):
dKL_dmu, dKL_dS = self.dKL_dmuS()
dL_dmu, dL_dS = self.dL_dmuS()
d_dmu = (dL_dmu - dKL_dmu).flatten()
d_dS = (dL_dS - dKL_dS).flatten()
self.dbound_dmuS = np.hstack((d_dmu, d_dS))
self.dbound_dZtheta = SparseGP._log_likelihood_gradients(self)
2013-06-25 17:43:42 +01:00
return np.hstack((self.dbound_dmuS.flatten(), self.dbound_dZtheta))
2013-06-05 14:11:49 +01:00
2013-07-18 15:15:09 +01:00
def plot_latent(self, plot_inducing=True, *args, **kwargs):
return plot_latent.plot_latent(self, plot_inducing=plot_inducing, *args, **kwargs)
2013-06-05 14:11:49 +01:00
def do_test_latents(self, Y):
"""
Compute the latent representation for a set of new points Y
Notes:
This will only work with a univariate Gaussian likelihood (for now)
"""
assert not self.likelihood.is_heteroscedastic
N_test = Y.shape[0]
input_dim = self.Z.shape[1]
means = np.zeros((N_test, input_dim))
covars = np.zeros((N_test, input_dim))
dpsi0 = -0.5 * self.input_dim * self.likelihood.precision
dpsi2 = self.dL_dpsi2[0][None, :, :] # TODO: this may change if we ignore het. likelihoods
V = self.likelihood.precision * Y
dpsi1 = np.dot(self.Cpsi1V, V.T)
start = np.zeros(self.input_dim * 2)
for n, dpsi1_n in enumerate(dpsi1.T[:, :, None]):
args = (self.kern, self.Z, dpsi0, dpsi1_n, dpsi2)
xopt, fopt, neval, status = SCG(f=latent_cost, gradf=latent_grad, x=start, optargs=args, display=False)
mu, log_S = xopt.reshape(2, 1, -1)
means[n] = mu[0].copy()
covars[n] = np.exp(log_S[0]).copy()
return means, covars
def plot_X_1d(self, fignum=None, ax=None, colors=None):
"""
Plot latent space X in 1D:
-if fig is given, create input_dim subplots in fig and plot in these
-if ax is given plot input_dim 1D latent space plots of X into each `axis`
-if neither fig nor ax is given create a figure with fignum and plot in there
colors:
colors of different latent space dimensions input_dim
"""
import pylab
if ax is None:
fig = pylab.figure(num=fignum, figsize=(8, min(12, (2 * self.X.shape[1]))))
if colors is None:
colors = pylab.gca()._get_lines.color_cycle
pylab.clf()
else:
colors = iter(colors)
plots = []
x = np.arange(self.X.shape[0])
for i in range(self.X.shape[1]):
if ax is None:
a = fig.add_subplot(self.X.shape[1], 1, i + 1)
elif isinstance(ax, (tuple, list)):
a = ax[i]
else:
raise ValueError("Need one ax per latent dimnesion input_dim")
a.plot(self.X, c='k', alpha=.3)
plots.extend(a.plot(x, self.X.T[i], c=colors.next(), label=r"$\mathbf{{X_{{{}}}}}$".format(i)))
a.fill_between(x,
self.X.T[i] - 2 * np.sqrt(self.X_variance.T[i]),
self.X.T[i] + 2 * np.sqrt(self.X_variance.T[i]),
facecolor=plots[-1].get_color(),
alpha=.3)
a.legend(borderaxespad=0.)
a.set_xlim(x.min(), x.max())
if i < self.X.shape[1] - 1:
a.set_xticklabels('')
pylab.draw()
fig.tight_layout(h_pad=.01) # , rect=(0, 0, 1, .95))
return fig
def latent_cost_and_grad(mu_S, kern, Z, dL_dpsi0, dL_dpsi1, dL_dpsi2):
"""
objective function for fitting the latent variables for test points
(negative log-likelihood: should be minimised!)
"""
mu, log_S = mu_S.reshape(2, 1, -1)
S = np.exp(log_S)
psi0 = kern.psi0(Z, mu, S)
psi1 = kern.psi1(Z, mu, S)
psi2 = kern.psi2(Z, mu, S)
lik = dL_dpsi0 * psi0 + np.dot(dL_dpsi1.flatten(), psi1.flatten()) + np.dot(dL_dpsi2.flatten(), psi2.flatten()) - 0.5 * np.sum(np.square(mu) + S) + 0.5 * np.sum(log_S)
mu0, S0 = kern.dpsi0_dmuS(dL_dpsi0, Z, mu, S)
mu1, S1 = kern.dpsi1_dmuS(dL_dpsi1, Z, mu, S)
mu2, S2 = kern.dpsi2_dmuS(dL_dpsi2, Z, mu, S)
dmu = mu0 + mu1 + mu2 - mu
# dS = S0 + S1 + S2 -0.5 + .5/S
dlnS = S * (S0 + S1 + S2 - 0.5) + .5
return -lik, -np.hstack((dmu.flatten(), dlnS.flatten()))
def latent_cost(mu_S, kern, Z, dL_dpsi0, dL_dpsi1, dL_dpsi2):
"""
objective function for fitting the latent variables (negative log-likelihood: should be minimised!)
This is the same as latent_cost_and_grad but only for the objective
"""
mu, log_S = mu_S.reshape(2, 1, -1)
S = np.exp(log_S)
psi0 = kern.psi0(Z, mu, S)
psi1 = kern.psi1(Z, mu, S)
psi2 = kern.psi2(Z, mu, S)
lik = dL_dpsi0 * psi0 + np.dot(dL_dpsi1.flatten(), psi1.flatten()) + np.dot(dL_dpsi2.flatten(), psi2.flatten()) - 0.5 * np.sum(np.square(mu) + S) + 0.5 * np.sum(log_S)
return -float(lik)
def latent_grad(mu_S, kern, Z, dL_dpsi0, dL_dpsi1, dL_dpsi2):
"""
This is the same as latent_cost_and_grad but only for the grad
"""
mu, log_S = mu_S.reshape(2, 1, -1)
S = np.exp(log_S)
mu0, S0 = kern.dpsi0_dmuS(dL_dpsi0, Z, mu, S)
mu1, S1 = kern.dpsi1_dmuS(dL_dpsi1, Z, mu, S)
mu2, S2 = kern.dpsi2_dmuS(dL_dpsi2, Z, mu, S)
dmu = mu0 + mu1 + mu2 - mu
# dS = S0 + S1 + S2 -0.5 + .5/S
dlnS = S * (S0 + S1 + S2 - 0.5) + .5
return -np.hstack((dmu.flatten(), dlnS.flatten()))