mirror of
https://github.com/SheffieldML/GPy.git
synced 2026-05-08 19:42:39 +02:00
220 lines
9.1 KiB
Python
220 lines
9.1 KiB
Python
'''
|
|
Created on 6 Nov 2013
|
|
|
|
@author: maxz
|
|
'''
|
|
|
|
import numpy as np
|
|
from parameterized import Parameterized
|
|
from param import Param
|
|
from transformations import Logexp, Logistic,__fixed__
|
|
from GPy.util.misc import param_to_array
|
|
from GPy.util.caching import Cache_this
|
|
|
|
class VariationalPrior(Parameterized):
|
|
def __init__(self, name='latent space', **kw):
|
|
super(VariationalPrior, self).__init__(name=name, **kw)
|
|
|
|
def KL_divergence(self, variational_posterior):
|
|
raise NotImplementedError, "override this for variational inference of latent space"
|
|
|
|
def update_gradients_KL(self, variational_posterior):
|
|
"""
|
|
updates the gradients for mean and variance **in place**
|
|
"""
|
|
raise NotImplementedError, "override this for variational inference of latent space"
|
|
|
|
class NormalPrior(VariationalPrior):
|
|
def KL_divergence(self, variational_posterior):
|
|
var_mean = np.square(variational_posterior.mean).sum()
|
|
var_S = (variational_posterior.variance - np.log(variational_posterior.variance)).sum()
|
|
return 0.5 * (var_mean + var_S) - 0.5 * variational_posterior.input_dim * variational_posterior.num_data
|
|
|
|
def update_gradients_KL(self, variational_posterior):
|
|
# dL:
|
|
variational_posterior.mean.gradient -= variational_posterior.mean
|
|
variational_posterior.variance.gradient -= (1. - (1. / (variational_posterior.variance))) * 0.5
|
|
|
|
class SpikeAndSlabPrior(VariationalPrior):
|
|
def __init__(self, pi=None, learnPi=False, variance = 1.0, name='SpikeAndSlabPrior', **kw):
|
|
super(SpikeAndSlabPrior, self).__init__(name=name, **kw)
|
|
self.variance = Param('variance',variance)
|
|
self.learnPi = learnPi
|
|
if learnPi:
|
|
self.pi = Param('Pi', pi, Logistic(1e-10,1.-1e-10))
|
|
else:
|
|
self.pi = Param('Pi', pi, __fixed__)
|
|
self.link_parameter(self.pi)
|
|
|
|
|
|
def KL_divergence(self, variational_posterior):
|
|
mu = variational_posterior.mean
|
|
S = variational_posterior.variance
|
|
gamma,gamma1 = variational_posterior.gamma_probabilities()
|
|
log_gamma,log_gamma1 = variational_posterior.gamma_log_prob()
|
|
if len(self.pi.shape)==2:
|
|
idx = np.unique(gamma._raveled_index()/gamma.shape[-1])
|
|
pi = self.pi[idx]
|
|
else:
|
|
pi = self.pi
|
|
|
|
var_mean = np.square(mu)/self.variance
|
|
var_S = (S/self.variance - np.log(S))
|
|
var_gamma = (gamma*(log_gamma-np.log(pi))).sum()+(gamma1*(log_gamma1-np.log(1-pi))).sum()
|
|
return var_gamma+ (gamma* (np.log(self.variance)-1. +var_mean + var_S)).sum()/2.
|
|
|
|
def update_gradients_KL(self, variational_posterior):
|
|
mu = variational_posterior.mean
|
|
S = variational_posterior.variance
|
|
gamma,gamma1 = variational_posterior.gamma_probabilities()
|
|
log_gamma,log_gamma1 = variational_posterior.gamma_log_prob()
|
|
if len(self.pi.shape)==2:
|
|
idx = np.unique(gamma._raveled_index()/gamma.shape[-1])
|
|
pi = self.pi[idx]
|
|
else:
|
|
pi = self.pi
|
|
|
|
variational_posterior.binary_prob.gradient -= (np.log((1-pi)/pi)+log_gamma-log_gamma1+((np.square(mu)+S)/self.variance-np.log(S)+np.log(self.variance)-1.)/2.)*gamma*gamma1
|
|
mu.gradient -= gamma*mu/self.variance
|
|
S.gradient -= (1./self.variance - 1./S) * gamma /2.
|
|
if self.learnPi:
|
|
if len(self.pi)==1:
|
|
self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)).sum()
|
|
elif len(self.pi.shape)==1:
|
|
self.pi.gradient = (gamma/self.pi - (1.-gamma)/(1.-self.pi)).sum(axis=0)
|
|
else:
|
|
self.pi[idx].gradient = (gamma/self.pi[idx] - (1.-gamma)/(1.-self.pi[idx]))
|
|
|
|
class VariationalPosterior(Parameterized):
|
|
def __init__(self, means=None, variances=None, name='latent space', *a, **kw):
|
|
super(VariationalPosterior, self).__init__(name=name, *a, **kw)
|
|
self.mean = Param("mean", means)
|
|
self.variance = Param("variance", variances, Logexp())
|
|
self.ndim = self.mean.ndim
|
|
self.shape = self.mean.shape
|
|
self.num_data, self.input_dim = self.mean.shape
|
|
self.link_parameters(self.mean, self.variance)
|
|
self.num_data, self.input_dim = self.mean.shape
|
|
if self.has_uncertain_inputs():
|
|
assert self.variance.shape == self.mean.shape, "need one variance per sample and dimenion"
|
|
|
|
def set_gradients(self, grad):
|
|
self.mean.gradient, self.variance.gradient = grad
|
|
|
|
def _raveled_index(self):
|
|
index = np.empty(dtype=int, shape=0)
|
|
size = 0
|
|
for p in self.parameters:
|
|
index = np.hstack((index, p._raveled_index()+size))
|
|
size += p._realsize_ if hasattr(p, '_realsize_') else p.size
|
|
return index
|
|
|
|
def has_uncertain_inputs(self):
|
|
return not self.variance is None
|
|
|
|
def __getitem__(self, s):
|
|
if isinstance(s, (int, slice, tuple, list, np.ndarray)):
|
|
import copy
|
|
n = self.__new__(self.__class__, self.name)
|
|
dc = self.__dict__.copy()
|
|
dc['mean'] = self.mean[s]
|
|
dc['variance'] = self.variance[s]
|
|
dc['parameters'] = copy.copy(self.parameters)
|
|
n.__dict__.update(dc)
|
|
n.parameters[dc['mean']._parent_index_] = dc['mean']
|
|
n.parameters[dc['variance']._parent_index_] = dc['variance']
|
|
n._gradient_array_ = None
|
|
oversize = self.size - self.mean.size - self.variance.size
|
|
n.size = n.mean.size + n.variance.size + oversize
|
|
n.ndim = n.mean.ndim
|
|
n.shape = n.mean.shape
|
|
n.num_data = n.mean.shape[0]
|
|
n.input_dim = n.mean.shape[1] if n.ndim != 1 else 1
|
|
return n
|
|
else:
|
|
return super(VariationalPosterior, self).__getitem__(s)
|
|
|
|
class NormalPosterior(VariationalPosterior):
|
|
'''
|
|
NormalPosterior distribution for variational approximations.
|
|
|
|
holds the means and variances for a factorizing multivariate normal distribution
|
|
'''
|
|
|
|
def plot(self, *args):
|
|
"""
|
|
Plot latent space X in 1D:
|
|
|
|
See GPy.plotting.matplot_dep.variational_plots
|
|
"""
|
|
import sys
|
|
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
|
|
from ...plotting.matplot_dep import variational_plots
|
|
import matplotlib
|
|
return variational_plots.plot(self,*args)
|
|
|
|
class SpikeAndSlabPosterior(VariationalPosterior):
|
|
'''
|
|
The SpikeAndSlab distribution for variational approximations.
|
|
'''
|
|
def __init__(self, means, variances, binary_prob, name='latent space'):
|
|
"""
|
|
binary_prob : the probability of the distribution on the slab part.
|
|
"""
|
|
super(SpikeAndSlabPosterior, self).__init__(means, variances, name)
|
|
self.gamma = Param("binary_prob",binary_prob)
|
|
self.link_parameter(self.gamma)
|
|
|
|
@Cache_this(limit=5)
|
|
def gamma_probabilities(self):
|
|
prob = np.zeros_like(param_to_array(self.gamma))
|
|
prob[self.gamma>-710] = 1./(1.+np.exp(-self.gamma[self.gamma>-710]))
|
|
prob1 = -np.zeros_like(param_to_array(self.gamma))
|
|
prob1[self.gamma<710] = 1./(1.+np.exp(self.gamma[self.gamma<710]))
|
|
return prob, prob1
|
|
|
|
@Cache_this(limit=5)
|
|
def gamma_log_prob(self):
|
|
loggamma = param_to_array(self.gamma).copy()
|
|
loggamma[loggamma>-40] = -np.log1p(np.exp(-loggamma[loggamma>-40]))
|
|
loggamma1 = -param_to_array(self.gamma).copy()
|
|
loggamma1[loggamma1>-40] = -np.log1p(np.exp(-loggamma1[loggamma1>-40]))
|
|
return loggamma,loggamma1
|
|
|
|
def set_gradients(self, grad):
|
|
self.mean.gradient, self.variance.gradient, self.gamma.gradient = grad
|
|
|
|
def __getitem__(self, s):
|
|
if isinstance(s, (int, slice, tuple, list, np.ndarray)):
|
|
import copy
|
|
n = self.__new__(self.__class__, self.name)
|
|
dc = self.__dict__.copy()
|
|
dc['mean'] = self.mean[s]
|
|
dc['variance'] = self.variance[s]
|
|
dc['binary_prob'] = self.binary_prob[s]
|
|
dc['parameters'] = copy.copy(self.parameters)
|
|
n.__dict__.update(dc)
|
|
n.parameters[dc['mean']._parent_index_] = dc['mean']
|
|
n.parameters[dc['variance']._parent_index_] = dc['variance']
|
|
n.parameters[dc['binary_prob']._parent_index_] = dc['binary_prob']
|
|
n._gradient_array_ = None
|
|
oversize = self.size - self.mean.size - self.variance.size
|
|
n.size = n.mean.size + n.variance.size + oversize
|
|
n.ndim = n.mean.ndim
|
|
n.shape = n.mean.shape
|
|
n.num_data = n.mean.shape[0]
|
|
n.input_dim = n.mean.shape[1] if n.ndim != 1 else 1
|
|
return n
|
|
else:
|
|
return super(VariationalPrior, self).__getitem__(s)
|
|
|
|
def plot(self, *args, **kwargs):
|
|
"""
|
|
Plot latent space X in 1D:
|
|
|
|
See GPy.plotting.matplot_dep.variational_plots
|
|
"""
|
|
import sys
|
|
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
|
|
from ...plotting.matplot_dep import variational_plots
|
|
return variational_plots.plot_SpikeSlab(self,*args, **kwargs)
|