mirror of
https://github.com/SheffieldML/GPy.git
synced 2026-05-30 14:35:15 +02:00
282 lines
12 KiB
Python
282 lines
12 KiB
Python
# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
|
|
# Licensed under the BSD 3-clause license (see LICENSE.txt)
|
|
|
|
import numpy as np
|
|
from .. import kern
|
|
from ..core import SparseGP
|
|
from ..likelihoods import Gaussian
|
|
from ..inference.optimization import SCG
|
|
from ..util import linalg
|
|
from ..core.parameterization.variational import NormalPosterior, NormalPrior, VariationalPosterior
|
|
from ..inference.latent_function_inference.var_dtc_parallel import update_gradients, VarDTC_minibatch
|
|
from ..inference.latent_function_inference.var_dtc_gpu import VarDTC_GPU
|
|
import logging
|
|
|
|
class BayesianGPLVM(SparseGP):
|
|
"""
|
|
Bayesian Gaussian Process Latent Variable Model
|
|
|
|
:param Y: observed data (np.ndarray) or GPy.likelihood
|
|
:type Y: np.ndarray| GPy.likelihood instance
|
|
:param input_dim: latent dimensionality
|
|
:type input_dim: int
|
|
:param init: initialisation method for the latent space
|
|
:type init: 'PCA'|'random'
|
|
|
|
"""
|
|
def __init__(self, Y, input_dim, X=None, X_variance=None, init='PCA', num_inducing=10,
|
|
Z=None, kernel=None, inference_method=None, likelihood=None, name='bayesian gplvm', mpi_comm=None, normalizer=None):
|
|
self.mpi_comm = mpi_comm
|
|
self.__IN_OPTIMIZATION__ = False
|
|
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
if X == None:
|
|
from ..util.initialization import initialize_latent
|
|
self.logger.info("initializing latent space X with method {}".format(init))
|
|
X, fracs = initialize_latent(init, input_dim, Y)
|
|
else:
|
|
fracs = np.ones(input_dim)
|
|
|
|
self.init = init
|
|
|
|
if X_variance is None:
|
|
self.logger.info("initializing latent space variance ~ uniform(0,.1)")
|
|
X_variance = np.random.uniform(0,.1,X.shape)
|
|
|
|
if Z is None:
|
|
self.logger.info("initializing inducing inputs")
|
|
Z = np.random.permutation(X.copy())[:num_inducing]
|
|
assert Z.shape[1] == X.shape[1]
|
|
|
|
if kernel is None:
|
|
self.logger.info("initializing kernel RBF")
|
|
kernel = kern.RBF(input_dim, lengthscale=1./fracs, ARD=True) #+ kern.Bias(input_dim) + kern.White(input_dim)
|
|
|
|
if likelihood is None:
|
|
likelihood = Gaussian()
|
|
|
|
self.variational_prior = NormalPrior()
|
|
X = NormalPosterior(X, X_variance)
|
|
|
|
if inference_method is None:
|
|
inan = np.isnan(Y)
|
|
if np.any(inan):
|
|
from ..inference.latent_function_inference.var_dtc import VarDTCMissingData
|
|
self.logger.debug("creating inference_method with var_dtc missing data")
|
|
inference_method = VarDTCMissingData(inan=inan)
|
|
elif mpi_comm is not None:
|
|
inference_method = VarDTC_minibatch(mpi_comm=mpi_comm)
|
|
else:
|
|
from ..inference.latent_function_inference.var_dtc import VarDTC
|
|
self.logger.debug("creating inference_method var_dtc")
|
|
inference_method = VarDTC()
|
|
if isinstance(inference_method,VarDTC_minibatch):
|
|
inference_method.mpi_comm = mpi_comm
|
|
|
|
if kernel.useGPU and isinstance(inference_method, VarDTC_GPU):
|
|
kernel.psicomp.GPU_direct = True
|
|
|
|
SparseGP.__init__(self, X, Y, Z, kernel, likelihood, inference_method, name, normalizer=normalizer)
|
|
self.logger.info("Adding X as parameter")
|
|
self.link_parameter(self.X, index=0)
|
|
|
|
if mpi_comm != None:
|
|
from ..util.mpi import divide_data
|
|
N_start, N_end, N_list = divide_data(Y.shape[0], mpi_comm)
|
|
self.N_range = (N_start, N_end)
|
|
self.N_list = np.array(N_list)
|
|
self.Y_local = self.Y[N_start:N_end]
|
|
print 'MPI RANK: '+str(self.mpi_comm.rank)+' with datasize: '+str(self.N_range)
|
|
mpi_comm.Bcast(self.param_array, root=0)
|
|
|
|
def set_X_gradients(self, X, X_grad):
|
|
"""Set the gradients of the posterior distribution of X in its specific form."""
|
|
X.mean.gradient, X.variance.gradient = X_grad
|
|
|
|
def get_X_gradients(self, X):
|
|
"""Get the gradients of the posterior distribution of X in its specific form."""
|
|
return X.mean.gradient, X.variance.gradient
|
|
|
|
def parameters_changed(self):
|
|
if isinstance(self.inference_method, VarDTC_GPU) or isinstance(self.inference_method, VarDTC_minibatch):
|
|
update_gradients(self, mpi_comm=self.mpi_comm)
|
|
return
|
|
|
|
super(BayesianGPLVM, self).parameters_changed()
|
|
self._log_marginal_likelihood -= self.variational_prior.KL_divergence(self.X)
|
|
|
|
self.X.mean.gradient, self.X.variance.gradient = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.grad_dict['dL_dpsi0'], dL_dpsi1=self.grad_dict['dL_dpsi1'], dL_dpsi2=self.grad_dict['dL_dpsi2'])
|
|
|
|
# This is testing code -------------------------
|
|
# i = np.random.randint(self.X.shape[0])
|
|
# X_ = self.X.mean
|
|
# which = np.sqrt(((X_ - X_[i:i+1])**2).sum(1)).argsort()>(max(0, self.X.shape[0]-51))
|
|
# _, _, grad_dict = self.inference_method.inference(self.kern, self.X[which], self.Z, self.likelihood, self.Y[which], self.Y_metadata)
|
|
# grad = self.kern.gradients_qX_expectations(variational_posterior=self.X[which], Z=self.Z, dL_dpsi0=grad_dict['dL_dpsi0'], dL_dpsi1=grad_dict['dL_dpsi1'], dL_dpsi2=grad_dict['dL_dpsi2'])
|
|
#
|
|
# self.X.mean.gradient[:] = 0
|
|
# self.X.variance.gradient[:] = 0
|
|
# self.X.mean.gradient[which] = grad[0]
|
|
# self.X.variance.gradient[which] = grad[1]
|
|
|
|
# update for the KL divergence
|
|
# self.variational_prior.update_gradients_KL(self.X, which)
|
|
# -----------------------------------------------
|
|
|
|
# update for the KL divergence
|
|
self.variational_prior.update_gradients_KL(self.X)
|
|
|
|
def plot_latent(self, labels=None, which_indices=None,
|
|
resolution=50, ax=None, marker='o', s=40,
|
|
fignum=None, plot_inducing=True, legend=True,
|
|
plot_limits=None,
|
|
aspect='auto', updates=False, predict_kwargs={}, imshow_kwargs={}):
|
|
import sys
|
|
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
|
|
from ..plotting.matplot_dep import dim_reduction_plots
|
|
|
|
return dim_reduction_plots.plot_latent(self, labels, which_indices,
|
|
resolution, ax, marker, s,
|
|
fignum, plot_inducing, legend,
|
|
plot_limits, aspect, updates, predict_kwargs, imshow_kwargs)
|
|
|
|
def do_test_latents(self, Y):
|
|
"""
|
|
Compute the latent representation for a set of new points Y
|
|
|
|
Notes:
|
|
This will only work with a univariate Gaussian likelihood (for now)
|
|
"""
|
|
N_test = Y.shape[0]
|
|
input_dim = self.Z.shape[1]
|
|
|
|
means = np.zeros((N_test, input_dim))
|
|
covars = np.zeros((N_test, input_dim))
|
|
|
|
dpsi0 = -0.5 * self.input_dim / self.likelihood.variance
|
|
dpsi2 = self.grad_dict['dL_dpsi2'][0][None, :, :] # TODO: this may change if we ignore het. likelihoods
|
|
V = Y/self.likelihood.variance
|
|
|
|
#compute CPsi1V
|
|
#if self.Cpsi1V is None:
|
|
# psi1V = np.dot(self.psi1.T, self.likelihood.V)
|
|
# tmp, _ = linalg.dtrtrs(self._Lm, np.asfortranarray(psi1V), lower=1, trans=0)
|
|
# tmp, _ = linalg.dpotrs(self.LB, tmp, lower=1)
|
|
# self.Cpsi1V, _ = linalg.dtrtrs(self._Lm, tmp, lower=1, trans=1)
|
|
|
|
dpsi1 = np.dot(self.posterior.woodbury_vector, V.T)
|
|
|
|
#start = np.zeros(self.input_dim * 2)
|
|
|
|
|
|
from scipy.optimize import minimize
|
|
|
|
for n, dpsi1_n in enumerate(dpsi1.T[:, :, None]):
|
|
args = (input_dim, self.kern.copy(), self.Z, dpsi0, dpsi1_n.T, dpsi2)
|
|
res = minimize(latent_cost_and_grad, jac=True, x0=np.hstack((means[n], covars[n])), args=args, method='BFGS')
|
|
xopt = res.x
|
|
mu, log_S = xopt.reshape(2, 1, -1)
|
|
means[n] = mu[0].copy()
|
|
covars[n] = np.exp(log_S[0]).copy()
|
|
|
|
X = NormalPosterior(means, covars)
|
|
|
|
return X
|
|
|
|
def dmu_dX(self, Xnew):
|
|
"""
|
|
Calculate the gradient of the prediction at Xnew w.r.t Xnew.
|
|
"""
|
|
dmu_dX = np.zeros_like(Xnew)
|
|
for i in range(self.Z.shape[0]):
|
|
dmu_dX += self.kern.gradients_X(self.Cpsi1Vf[i:i + 1, :], Xnew, self.Z[i:i + 1, :])
|
|
return dmu_dX
|
|
|
|
def dmu_dXnew(self, Xnew):
|
|
"""
|
|
Individual gradient of prediction at Xnew w.r.t. each sample in Xnew
|
|
"""
|
|
gradients_X = np.zeros((Xnew.shape[0], self.num_inducing))
|
|
ones = np.ones((1, 1))
|
|
for i in range(self.Z.shape[0]):
|
|
gradients_X[:, i] = self.kern.gradients_X(ones, Xnew, self.Z[i:i + 1, :]).sum(-1)
|
|
return np.dot(gradients_X, self.Cpsi1Vf)
|
|
|
|
def plot_steepest_gradient_map(self, *args, ** kwargs):
|
|
"""
|
|
See GPy.plotting.matplot_dep.dim_reduction_plots.plot_steepest_gradient_map
|
|
"""
|
|
import sys
|
|
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
|
|
from ..plotting.matplot_dep import dim_reduction_plots
|
|
|
|
return dim_reduction_plots.plot_steepest_gradient_map(self,*args,**kwargs)
|
|
def __getstate__(self):
|
|
dc = super(BayesianGPLVM, self).__getstate__()
|
|
dc['mpi_comm'] = None
|
|
if self.mpi_comm != None:
|
|
del dc['N_range']
|
|
del dc['N_list']
|
|
del dc['Y_local']
|
|
return dc
|
|
|
|
def __setstate__(self, state):
|
|
return super(BayesianGPLVM, self).__setstate__(state)
|
|
|
|
#=====================================================
|
|
# The MPI parallelization
|
|
# - can move to model at some point
|
|
#=====================================================
|
|
|
|
def _set_params_transformed(self, p):
|
|
if self.mpi_comm != None:
|
|
if self.__IN_OPTIMIZATION__ and self.mpi_comm.rank==0:
|
|
self.mpi_comm.Bcast(np.int32(1),root=0)
|
|
self.mpi_comm.Bcast(p, root=0)
|
|
super(BayesianGPLVM, self)._set_params_transformed(p)
|
|
|
|
def optimize(self, optimizer=None, start=None, **kwargs):
|
|
self.__IN_OPTIMIZATION__ = True
|
|
if self.mpi_comm==None:
|
|
super(BayesianGPLVM, self).optimize(optimizer,start,**kwargs)
|
|
elif self.mpi_comm.rank==0:
|
|
super(BayesianGPLVM, self).optimize(optimizer,start,**kwargs)
|
|
self.mpi_comm.Bcast(np.int32(-1),root=0)
|
|
elif self.mpi_comm.rank>0:
|
|
x = self._get_params_transformed().copy()
|
|
flag = np.empty(1,dtype=np.int32)
|
|
while True:
|
|
self.mpi_comm.Bcast(flag,root=0)
|
|
if flag==1:
|
|
self._set_params_transformed(x)
|
|
elif flag==-1:
|
|
break
|
|
else:
|
|
self.__IN_OPTIMIZATION__ = False
|
|
raise Exception("Unrecognizable flag for synchronization!")
|
|
self.__IN_OPTIMIZATION__ = False
|
|
|
|
|
|
def latent_cost_and_grad(mu_S, input_dim, kern, Z, dL_dpsi0, dL_dpsi1, dL_dpsi2):
|
|
"""
|
|
objective function for fitting the latent variables for test points
|
|
(negative log-likelihood: should be minimised!)
|
|
"""
|
|
mu = mu_S[:input_dim][None]
|
|
log_S = mu_S[input_dim:][None]
|
|
S = np.exp(log_S)
|
|
|
|
X = NormalPosterior(mu, S)
|
|
|
|
psi0 = kern.psi0(Z, X)
|
|
psi1 = kern.psi1(Z, X)
|
|
psi2 = kern.psi2(Z, X)
|
|
|
|
lik = dL_dpsi0 * psi0.sum() + np.einsum('ij,kj->...', dL_dpsi1, psi1) + np.einsum('ijk,lkj->...', dL_dpsi2, psi2) - 0.5 * np.sum(np.square(mu) + S) + 0.5 * np.sum(log_S)
|
|
|
|
dLdmu, dLdS = kern.gradients_qX_expectations(dL_dpsi0, dL_dpsi1, dL_dpsi2, Z, X)
|
|
dmu = dLdmu - mu
|
|
# dS = S0 + S1 + S2 -0.5 + .5/S
|
|
dlnS = S * (dLdS - 0.5) + .5
|
|
|
|
return -lik, -np.hstack((dmu.flatten(), dlnS.flatten()))
|