diff --git a/GPy/core/parameterization/variational.py b/GPy/core/parameterization/variational.py index 9e0127a2..d0fd83d2 100644 --- a/GPy/core/parameterization/variational.py +++ b/GPy/core/parameterization/variational.py @@ -55,8 +55,13 @@ class SpikeAndSlabPrior(VariationalPrior): mu = variational_posterior.mean S = variational_posterior.variance gamma = variational_posterior.binary_prob + if len(self.pi.shape)==2: + idx = np.unique(gamma._raveled_index()/gamma.shape[-1]) + pi = self.pi[idx] + else: + pi = self.pi - gamma.gradient -= np.log((1-self.pi)/self.pi*gamma/(1.-gamma))+((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2. + gamma.gradient -= np.log((1-pi)/pi*gamma/(1.-gamma))+((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2. mu.gradient -= gamma*mu/self.variance S.gradient -= (1./self.variance - 1./S) * gamma /2. if self.learnPi: @@ -65,7 +70,7 @@ class SpikeAndSlabPrior(VariationalPrior): elif len(self.pi.shape)==1: self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)).sum(axis=0) else: - self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)) + self.pi[idx].gradient = (gamma/self.pi[idx] - (1.-gamma)/(1.-self.pi[idx])) class VariationalPosterior(Parameterized): def __init__(self, means=None, variances=None, name='latent space', *a, **kw): diff --git a/GPy/core/sparse_gp_mpi.py b/GPy/core/sparse_gp_mpi.py new file mode 100644 index 00000000..9d28f0a0 --- /dev/null +++ b/GPy/core/sparse_gp_mpi.py @@ -0,0 +1,113 @@ +# Copyright (c) 2012, GPy authors (see AUTHORS.txt). +# Licensed under the BSD 3-clause license (see LICENSE.txt) + +import numpy as np +from sparse_gp import SparseGP +from parameterization.param import Param +from ..inference.latent_function_inference import var_dtc +from .. import likelihoods +from parameterization.variational import VariationalPosterior +from ..inference.latent_function_inference.var_dtc_parallel import update_gradients, VarDTC_minibatch +from ..core.parameterization.parameter_core import OptimizationHandlable + +import logging +logger = logging.getLogger("sparse gp mpi") + +class SparseGP_MPI(SparseGP): + """ + A general purpose Sparse GP model with MPI parallelization support + + This model allows (approximate) inference using variational DTC or FITC + (Gaussian likelihoods) as well as non-conjugate sparse methods based on + these. + + :param X: inputs + :type X: np.ndarray (num_data x input_dim) + :param likelihood: a likelihood instance, containing the observed data + :type likelihood: GPy.likelihood.(Gaussian | EP | Laplace) + :param kernel: the kernel (covariance function). See link kernels + :type kernel: a GPy.kern.kern instance + :param X_variance: The uncertainty in the measurements of X (Gaussian variance) + :type X_variance: np.ndarray (num_data x input_dim) | None + :param Z: inducing inputs + :type Z: np.ndarray (num_inducing x input_dim) + :param num_inducing: Number of inducing points (optional, default 10. Ignored if Z is not None) + :type num_inducing: int + :param mpi_comm: The communication group of MPI, e.g. mpi4py.MPI.COMM_WORLD + :type mpi_comm: mpi4py.MPI.Intracomm + + """ + + def __init__(self, X, Y, Z, kernel, likelihood, variational_prior=None, inference_method=None, name='sparse gp mpi', Y_metadata=None, mpi_comm=None): + self._IN_OPTIMIZATION_ = False + if mpi_comm != None: + if inference_method is None: + inference_method = VarDTC_minibatch(mpi_comm=mpi_comm) + else: + assert isinstance(inference_method, VarDTC_minibatch), 'inference_method has to support MPI!' + + super(SparseGP_MPI, self).__init__(X, Y, Z, kernel, likelihood, inference_method=inference_method, name=name, Y_metadata=Y_metadata) + self.updates = False + self.add_parameter(self.X, index=0) + if variational_prior is not None: + self.add_parameter(variational_prior) + self.X.fix() + + self.mpi_comm = mpi_comm + # Manage the data (Y) division + if mpi_comm != None: + from ..util.mpi import divide_data + N_start, N_end, N_list = divide_data(Y.shape[0], mpi_comm) + self.N_range = (N_start, N_end) + self.N_list = np.array(N_list) + self.Y_local = self.Y[N_start:N_end] + print 'MPI RANK '+str(self.mpi_comm.rank)+' with the data range '+str(self.N_range) + mpi_comm.Bcast(self.param_array, root=0) + self.updates = True + + def __getstate__(self): + dc = super(SparseGP_MPI, self).__getstate__() + dc['mpi_comm'] = None + if self.mpi_comm != None: + del dc['N_range'] + del dc['N_list'] + del dc['Y_local'] + return dc + + #===================================================== + # The MPI parallelization + # - can move to model at some point + #===================================================== + + @SparseGP.optimizer_array.setter + def optimizer_array(self, p): + if self.mpi_comm != None: + if self._IN_OPTIMIZATION_ and self.mpi_comm.rank==0: + self.mpi_comm.Bcast(np.int32(1),root=0) + self.mpi_comm.Bcast(p, root=0) + SparseGP.optimizer_array.fset(self,p) + + def optimize(self, optimizer=None, start=None, **kwargs): + self._IN_OPTIMIZATION_ = True + if self.mpi_comm==None: + super(SparseGP_MPI, self).optimize(optimizer,start,**kwargs) + elif self.mpi_comm.rank==0: + super(SparseGP_MPI, self).optimize(optimizer,start,**kwargs) + self.mpi_comm.Bcast(np.int32(-1),root=0) + elif self.mpi_comm.rank>0: + x = self._get_params_transformed().copy() + flag = np.empty(1,dtype=np.int32) + while True: + self.mpi_comm.Bcast(flag,root=0) + if flag==1: + self._set_params_transformed(x) + elif flag==-1: + break + else: + self._IN_OPTIMIZATION_ = False + raise Exception("Unrecognizable flag for synchronization!") + self._IN_OPTIMIZATION_ = False + + def parameters_changed(self): + update_gradients(self, mpi_comm=self.mpi_comm) + diff --git a/GPy/inference/latent_function_inference/var_dtc_parallel.py b/GPy/inference/latent_function_inference/var_dtc_parallel.py index 19fc0fa8..e6292473 100644 --- a/GPy/inference/latent_function_inference/var_dtc_parallel.py +++ b/GPy/inference/latent_function_inference/var_dtc_parallel.py @@ -379,7 +379,7 @@ def update_gradients(model, mpi_comm=None): # Gather the gradients from multiple MPI nodes if mpi_comm != None: if het_noise: - assert False, "Not implemented!" + raise "het_noise not implemented!" kern_grad_all = kern_grad.copy() Z_grad_all = model.Z.gradient.copy() mpi_comm.Allreduce([kern_grad, MPI.DOUBLE], [kern_grad_all, MPI.DOUBLE]) @@ -404,10 +404,10 @@ def update_gradients(model, mpi_comm=None): mpi_comm.Allreduce([np.float64(KL_div), MPI.DOUBLE], [KL_div_all, MPI.DOUBLE]) KL_div = KL_div_all [mpi_comm.Allgatherv([pp.copy(), MPI.DOUBLE], [pa, (model.N_list*pa.shape[-1], None), MPI.DOUBLE]) for pp,pa in zip(model.get_X_gradients(X),model.get_X_gradients(model.X))] - from ...models import SSGPLVM - if isinstance(model, SSGPLVM): - grad_pi = np.array(model.variational_prior.pi.gradient) - mpi_comm.Allreduce([grad_pi.copy(), MPI.DOUBLE], [model.variational_prior.pi.gradient, MPI.DOUBLE]) +# from ...models import SSGPLVM +# if isinstance(model, SSGPLVM): +# grad_pi = np.array(model.variational_prior.pi.gradient) +# mpi_comm.Allreduce([grad_pi.copy(), MPI.DOUBLE], [model.variational_prior.pi.gradient, MPI.DOUBLE]) model._log_marginal_likelihood -= KL_div # dL_dthetaL diff --git a/GPy/models/ss_gplvm.py b/GPy/models/ss_gplvm.py index ba793fc2..1b06ce32 100644 --- a/GPy/models/ss_gplvm.py +++ b/GPy/models/ss_gplvm.py @@ -3,7 +3,7 @@ import numpy as np -from ..core.sparse_gp import SparseGP +from ..core.sparse_gp_mpi import SparseGP_MPI from .. import kern from ..likelihoods import Gaussian from ..core.parameterization.variational import SpikeAndSlabPrior, SpikeAndSlabPosterior @@ -11,7 +11,7 @@ from ..inference.latent_function_inference.var_dtc_parallel import update_gradie from ..inference.latent_function_inference.var_dtc_gpu import VarDTC_GPU from ..kern._src.psi_comp.ssrbf_psi_gpucomp import PSICOMP_SSRBF_GPU -class SSGPLVM(SparseGP): +class SSGPLVM(SparseGP_MPI): """ Spike-and-Slab Gaussian Process Latent Variable Model @@ -26,8 +26,6 @@ class SSGPLVM(SparseGP): def __init__(self, Y, input_dim, X=None, X_variance=None, Gamma=None, init='PCA', num_inducing=10, Z=None, kernel=None, inference_method=None, likelihood=None, name='Spike_and_Slab GPLVM', group_spike=False, mpi_comm=None, pi=None, learnPi=True, **kwargs): - self.mpi_comm = mpi_comm - self.__IN_OPTIMIZATION__ = False self.group_spike = group_spike if X == None: @@ -70,20 +68,11 @@ class SSGPLVM(SparseGP): self.variational_prior = SpikeAndSlabPrior(pi=pi,learnPi=learnPi) # the prior probability of the latent binary variable b X = SpikeAndSlabPosterior(X, X_variance, gamma) - - SparseGP.__init__(self, X, Y, Z, kernel, likelihood, inference_method, name, **kwargs) - self.add_parameter(self.X, index=0) - self.add_parameter(self.variational_prior) - if mpi_comm != None: - from ..util.mpi import divide_data - N_start, N_end, N_list = divide_data(Y.shape[0], mpi_comm) - self.N_range = (N_start, N_end) - self.N_list = np.array(N_list) - self.Y_local = self.Y[N_start:N_end] - print 'MPI RANK: '+str(self.mpi_comm.rank)+' with datasize: '+str(self.N_range) - mpi_comm.Bcast(self.param_array, root=0) - + super(SSGPLVM,self).__init__(X, Y, Z, kernel, likelihood, variational_prior=self.variational_prior, inference_method=inference_method, name=name, mpi_comm=mpi_comm, **kwargs) + self.X.unfix() + self.X.variance.constrain_positive() + if self.group_spike: [self.X.gamma[:,i].tie('tieGamma'+str(i)) for i in xrange(self.X.gamma.shape[1])] # Tie columns together @@ -95,18 +84,18 @@ class SSGPLVM(SparseGP): """Get the gradients of the posterior distribution of X in its specific form.""" return X.mean.gradient, X.variance.gradient, X.binary_prob.gradient - def parameters_changed(self): - if isinstance(self.inference_method, VarDTC_GPU) or isinstance(self.inference_method, VarDTC_minibatch): - update_gradients(self, mpi_comm=self.mpi_comm) - return - - super(SSGPLVM, self).parameters_changed() - self._log_marginal_likelihood -= self.variational_prior.KL_divergence(self.X) - - self.X.mean.gradient, self.X.variance.gradient, self.X.binary_prob.gradient = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.grad_dict['dL_dpsi0'], dL_dpsi1=self.grad_dict['dL_dpsi1'], dL_dpsi2=self.grad_dict['dL_dpsi2']) - - # update for the KL divergence - self.variational_prior.update_gradients_KL(self.X) +# def parameters_changed(self): +# if isinstance(self.inference_method, VarDTC_GPU) or isinstance(self.inference_method, VarDTC_minibatch): +# update_gradients(self, mpi_comm=self.mpi_comm) +# return +# +# super(SSGPLVM, self).parameters_changed() +# self._log_marginal_likelihood -= self.variational_prior.KL_divergence(self.X) +# +# self.X.mean.gradient, self.X.variance.gradient, self.X.binary_prob.gradient = self.kern.gradients_qX_expectations(variational_posterior=self.X, Z=self.Z, dL_dpsi0=self.grad_dict['dL_dpsi0'], dL_dpsi1=self.grad_dict['dL_dpsi1'], dL_dpsi2=self.grad_dict['dL_dpsi2']) +# +# # update for the KL divergence +# self.variational_prior.update_gradients_KL(self.X) def input_sensitivity(self): if self.kern.ARD: @@ -121,47 +110,3 @@ class SSGPLVM(SparseGP): return dim_reduction_plots.plot_latent(self, plot_inducing=plot_inducing, *args, **kwargs) - def __getstate__(self): - dc = super(SSGPLVM, self).__getstate__() - dc['mpi_comm'] = None - if self.mpi_comm != None: - del dc['N_range'] - del dc['N_list'] - del dc['Y_local'] - return dc - - def __setstate__(self, state): - return super(SSGPLVM, self).__setstate__(state) - - #===================================================== - # The MPI parallelization - # - can move to model at some point - #===================================================== - - def _set_params_transformed(self, p): - if self.mpi_comm != None: - if self.__IN_OPTIMIZATION__ and self.mpi_comm.rank==0: - self.mpi_comm.Bcast(np.int32(1),root=0) - self.mpi_comm.Bcast(p, root=0) - super(SSGPLVM, self)._set_params_transformed(p) - - def optimize(self, optimizer=None, start=None, **kwargs): - self.__IN_OPTIMIZATION__ = True - if self.mpi_comm==None: - super(SSGPLVM, self).optimize(optimizer,start,**kwargs) - elif self.mpi_comm.rank==0: - super(SSGPLVM, self).optimize(optimizer,start,**kwargs) - self.mpi_comm.Bcast(np.int32(-1),root=0) - elif self.mpi_comm.rank>0: - x = self._get_params_transformed().copy() - flag = np.empty(1,dtype=np.int32) - while True: - self.mpi_comm.Bcast(flag,root=0) - if flag==1: - self._set_params_transformed(x) - elif flag==-1: - break - else: - self.__IN_OPTIMIZATION__ = False - raise Exception("Unrecognizable flag for synchronization!") - self.__IN_OPTIMIZATION__ = False