mirror of
https://github.com/SheffieldML/GPy.git
synced 2026-05-10 04:22:38 +02:00
189 lines
8.6 KiB
Python
189 lines
8.6 KiB
Python
# Copyright (c) 2012, GPy authors (see AUTHORS.txt).
|
|
# Licensed under the BSD 3-clause license (see LICENSE.txt)
|
|
|
|
import numpy as np
|
|
|
|
from ..core.sparse_gp_mpi import SparseGP_MPI
|
|
from .. import kern
|
|
from ..core.parameterization import Param
|
|
from ..likelihoods import Gaussian
|
|
from ..core.parameterization.variational import SpikeAndSlabPrior, SpikeAndSlabPosterior
|
|
from ..inference.latent_function_inference.var_dtc_parallel import update_gradients, VarDTC_minibatch
|
|
from ..kern._src.psi_comp.ssrbf_psi_gpucomp import PSICOMP_SSRBF_GPU
|
|
|
|
class IBPPosterior(SpikeAndSlabPosterior):
|
|
'''
|
|
The SpikeAndSlab distribution for variational approximations.
|
|
'''
|
|
def __init__(self, means, variances, binary_prob, tau, name='latent space'):
|
|
"""
|
|
binary_prob : the probability of the distribution on the slab part.
|
|
"""
|
|
from ..core.parameterization.transformations import Logexp, __fixed__
|
|
super(IBPPosterior, self).__init__(means, variances, name)
|
|
self.tau = Param("tau",SpikeAndSlabPosterior, Logexp)
|
|
self.link_parameter(self.tau)
|
|
|
|
def set_gradients(self, grad):
|
|
self.mean.gradient, self.variance.gradient, self.gamma.gradient, self.tau.gradient = grad
|
|
|
|
def __getitem__(self, s):
|
|
if isinstance(s, (int, slice, tuple, list, np.ndarray)):
|
|
import copy
|
|
n = self.__new__(self.__class__, self.name)
|
|
dc = self.__dict__.copy()
|
|
dc['mean'] = self.mean[s]
|
|
dc['variance'] = self.variance[s]
|
|
dc['binary_prob'] = self.binary_prob[s]
|
|
dc['tau'] = self.tau[s]
|
|
dc['parameters'] = copy.copy(self.parameters)
|
|
n.__dict__.update(dc)
|
|
n.parameters[dc['mean']._parent_index_] = dc['mean']
|
|
n.parameters[dc['variance']._parent_index_] = dc['variance']
|
|
n.parameters[dc['binary_prob']._parent_index_] = dc['binary_prob']
|
|
n.parameters[dc['tau']._parent_index_] = dc['tau']
|
|
n._gradient_array_ = None
|
|
oversize = self.size - self.mean.size - self.variance.size - self.gamma.size - self.tau.size
|
|
n.size = n.mean.size + n.variance.size + n.gamma.size+ n.tau.size + oversize
|
|
n.ndim = n.mean.ndim
|
|
n.shape = n.mean.shape
|
|
n.num_data = n.mean.shape[0]
|
|
n.input_dim = n.mean.shape[1] if n.ndim != 1 else 1
|
|
return n
|
|
else:
|
|
return super(IBPPosterior, self).__getitem__(s)
|
|
|
|
class IBPPrior(SpikeAndSlabPrior):
|
|
def __init__(self, input_dim, alpha =2., name='IBPPrior', **kw):
|
|
super(SpikeAndSlabPrior, self).__init__(name=name, **kw)
|
|
from ..core.parameterization.transformations import Logexp, __fixed__
|
|
self.input_dim = input_dim
|
|
self.alpha = Param('alpha', alpha, __fixed__)
|
|
self.link_parameter(self.alpha)
|
|
|
|
def KL_divergence(self, variational_posterior):
|
|
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma.values, variational_posterior.tau.values
|
|
|
|
var_mean = np.square(mu)/self.variance
|
|
var_S = (S/self.variance - np.log(S))
|
|
part1 = (gamma* (np.log(self.variance)-1. +var_mean + var_S)).sum()/2.
|
|
|
|
ad = self.alpha/self.input_dim
|
|
from scipy.special import betaln,digamma
|
|
part2 = (gamma*np.log(gamma)).sum() + ((1.-gamma)*np.log(1.-gamma)).sum() + betaln(ad,1.)*self.input_dim \
|
|
-betaln(tau[:,0], tau[:,1]).sum() + ((tau[:,0]-gamma-ad)*digamma(tau[:,0])).sum() + \
|
|
((tau[:,1]+gamma-2.)*digamma(tau[:,1])).sum() + ((2.+ad-tau[:,0]-tau[:,1])*digamma(tau.sum(axis=1))).sum()
|
|
|
|
return part1+part2
|
|
|
|
def update_gradients_KL(self, variational_posterior):
|
|
mu, S, gamma, tau = variational_posterior.mean.values, variational_posterior.variance.values, variational_posterior.gamma.values, variational_posterior.tau.values
|
|
|
|
variational_posterior.binary_prob.gradient -= ((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2.
|
|
mu.gradient -= gamma*mu/self.variance
|
|
S.gradient -= (1./self.variance - 1./S) * gamma /2.
|
|
if self.learnPi:
|
|
if len(self.pi)==1:
|
|
self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)).sum()
|
|
elif len(self.pi.shape)==1:
|
|
self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)).sum(axis=0)
|
|
else:
|
|
self.pi[idx].gradient = (gamma/self.pi[idx] - (1.-gamma)/(1.-self.pi[idx]))
|
|
|
|
class SSGPLVM(SparseGP_MPI):
|
|
"""
|
|
Spike-and-Slab Gaussian Process Latent Variable Model
|
|
|
|
:param Y: observed data (np.ndarray) or GPy.likelihood
|
|
:type Y: np.ndarray| GPy.likelihood instance
|
|
:param input_dim: latent dimensionality
|
|
:type input_dim: int
|
|
:param init: initialisation method for the latent space
|
|
:type init: 'PCA'|'random'
|
|
|
|
"""
|
|
def __init__(self, Y, input_dim, X=None, X_variance=None, Gamma=None, init='PCA', num_inducing=10,
|
|
Z=None, kernel=None, inference_method=None, likelihood=None, name='Spike_and_Slab GPLVM', group_spike=False, mpi_comm=None, pi=None, learnPi=True,normalizer=False, **kwargs):
|
|
|
|
self.group_spike = group_spike
|
|
|
|
if X == None:
|
|
from ..util.initialization import initialize_latent
|
|
X, fracs = initialize_latent(init, input_dim, Y)
|
|
else:
|
|
fracs = np.ones(input_dim)
|
|
|
|
self.init = init
|
|
|
|
if X_variance is None: # The variance of the variational approximation (S)
|
|
X_variance = np.random.uniform(0,.1,X.shape)
|
|
|
|
if Gamma is None:
|
|
gamma = np.empty_like(X) # The posterior probabilities of the binary variable in the variational approximation
|
|
gamma[:] = 0.5 + 0.1 * np.random.randn(X.shape[0], input_dim)
|
|
gamma[gamma>1.-1e-9] = 1.-1e-9
|
|
gamma[gamma<1e-9] = 1e-9
|
|
else:
|
|
gamma = Gamma.copy()
|
|
|
|
if Z is None:
|
|
Z = np.random.permutation(X.copy())[:num_inducing]
|
|
assert Z.shape[1] == X.shape[1]
|
|
|
|
if likelihood is None:
|
|
likelihood = Gaussian()
|
|
|
|
if kernel is None:
|
|
kernel = kern.RBF(input_dim, lengthscale=fracs, ARD=True) # + kern.white(input_dim)
|
|
if kernel.useGPU:
|
|
kernel.psicomp = PSICOMP_SSRBF_GPU()
|
|
|
|
if inference_method is None:
|
|
inference_method = VarDTC_minibatch(mpi_comm=mpi_comm)
|
|
|
|
if pi is None:
|
|
pi = np.empty((input_dim))
|
|
pi[:] = 0.5
|
|
self.variational_prior = SpikeAndSlabPrior(pi=pi,learnPi=learnPi) # the prior probability of the latent binary variable b
|
|
|
|
X = SpikeAndSlabPosterior(X, X_variance, gamma)
|
|
|
|
super(SSGPLVM,self).__init__(X, Y, Z, kernel, likelihood, variational_prior=self.variational_prior, inference_method=inference_method, name=name, mpi_comm=mpi_comm, normalizer=normalizer, **kwargs)
|
|
self.link_parameter(self.X, index=0)
|
|
|
|
if self.group_spike:
|
|
[self.X.gamma[:,i].tie('tieGamma'+str(i)) for i in range(self.X.gamma.shape[1])] # Tie columns together
|
|
|
|
def set_X_gradients(self, X, X_grad):
|
|
"""Set the gradients of the posterior distribution of X in its specific form."""
|
|
X.mean.gradient, X.variance.gradient, X.binary_prob.gradient = X_grad
|
|
|
|
def get_X_gradients(self, X):
|
|
"""Get the gradients of the posterior distribution of X in its specific form."""
|
|
return X.mean.gradient, X.variance.gradient, X.binary_prob.gradient
|
|
|
|
def parameters_changed(self):
|
|
super(SSGPLVM,self).parameters_changed()
|
|
if isinstance(self.inference_method, VarDTC_minibatch):
|
|
return
|
|
|
|
self._log_marginal_likelihood -= self.variational_prior.KL_divergence(self.X)
|
|
|
|
self.X.mean.gradient, self.X.variance.gradient, self.X.binary_prob.gradient = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.grad_dict['dL_dpsi0'], dL_dpsi1=self.grad_dict['dL_dpsi1'], dL_dpsi2=self.grad_dict['dL_dpsi2'])
|
|
|
|
# update for the KL divergence
|
|
self.variational_prior.update_gradients_KL(self.X)
|
|
|
|
def input_sensitivity(self):
|
|
if self.kern.ARD:
|
|
return self.kern.input_sensitivity()
|
|
else:
|
|
return self.variational_prior.pi
|
|
|
|
def plot_latent(self, plot_inducing=True, *args, **kwargs):
|
|
import sys
|
|
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
|
|
from ..plotting.matplot_dep import dim_reduction_plots
|
|
|
|
return dim_reduction_plots.plot_latent(self, plot_inducing=plot_inducing, *args, **kwargs)
|
|
|