mirror of
https://github.com/SheffieldML/GPy.git
synced 2026-05-10 04:22:38 +02:00
[mpi] add mpi into ssgplvm
This commit is contained in:
parent
a702b0862b
commit
d911766624
6 changed files with 165 additions and 288 deletions
|
|
@ -9,6 +9,11 @@ import numpy as np
|
|||
from ...util.misc import param_to_array
|
||||
log_2_pi = np.log(2*np.pi)
|
||||
|
||||
try:
|
||||
from mpi4py import MPI
|
||||
except:
|
||||
pass
|
||||
|
||||
class VarDTC_minibatch(object):
|
||||
"""
|
||||
An object for inference when the likelihood is Gaussian, but we want to do sparse inference.
|
||||
|
|
@ -20,9 +25,10 @@ class VarDTC_minibatch(object):
|
|||
|
||||
"""
|
||||
const_jitter = 1e-6
|
||||
def __init__(self, batchsize, limit=1):
|
||||
def __init__(self, batchsize, limit=1, mpi_comm=None):
|
||||
|
||||
self.batchsize = batchsize
|
||||
self.mpi_comm = mpi_comm
|
||||
|
||||
# Cache functions
|
||||
from ...util.caching import Cacher
|
||||
|
|
@ -51,42 +57,23 @@ class VarDTC_minibatch(object):
|
|||
else:
|
||||
return jitchol(tdot(Y))
|
||||
|
||||
def inference_likelihood(self, kern, X, Z, likelihood, Y):
|
||||
"""
|
||||
The first phase of inference:
|
||||
Compute: log-likelihood, dL_dKmm
|
||||
|
||||
Cached intermediate results: Kmm, KmmInv,
|
||||
"""
|
||||
|
||||
def gatherPsiStat(self, kern, X, Z, Y, beta, uncertain_inputs, het_noise):
|
||||
|
||||
num_inducing = Z.shape[0]
|
||||
num_data, output_dim = Y.shape
|
||||
|
||||
if isinstance(X, VariationalPosterior):
|
||||
uncertain_inputs = True
|
||||
else:
|
||||
uncertain_inputs = False
|
||||
|
||||
#see whether we've got a different noise variance for each datum
|
||||
beta = 1./np.fmax(likelihood.variance, 1e-6)
|
||||
het_noise = beta.size > 1
|
||||
if het_noise:
|
||||
self.batchsize = 1
|
||||
# VVT_factor is a matrix such that tdot(VVT_factor) = VVT...this is for efficiency!
|
||||
#self.YYTfactor = beta*self.get_YYTfactor(Y)
|
||||
YYT_factor = Y
|
||||
trYYT = self.get_trYYT(Y)
|
||||
|
||||
|
||||
psi2_full = np.zeros((num_inducing,num_inducing))
|
||||
psi1Y_full = np.zeros((output_dim,num_inducing)) # DxM
|
||||
psi0_full = 0
|
||||
YRY_full = 0
|
||||
psi0_full = 0.
|
||||
YRY_full = 0.
|
||||
|
||||
for n_start in xrange(0,num_data,self.batchsize):
|
||||
|
||||
n_end = min(self.batchsize+n_start, num_data)
|
||||
|
||||
Y_slice = YYT_factor[n_start:n_end]
|
||||
Y_slice = Y[n_start:n_end]
|
||||
X_slice = X[n_start:n_end]
|
||||
|
||||
if uncertain_inputs:
|
||||
|
|
@ -123,7 +110,47 @@ class VarDTC_minibatch(object):
|
|||
psi1Y_full *= beta
|
||||
psi2_full *= beta
|
||||
YRY_full = trYYT*beta
|
||||
|
||||
if self.mpi_comm != None:
|
||||
psi0_all = np.array(psi0_full)
|
||||
psi1Y_all = psi1Y_full.copy()
|
||||
psi2_all = psi2_full.copy()
|
||||
YRY_all = np.array(YRY_full)
|
||||
self.mpi_comm.Allreduce([psi0_full, MPI.DOUBLE], [psi0_all, MPI.DOUBLE])
|
||||
self.mpi_comm.Allreduce([psi1Y_full, MPI.DOUBLE], [psi1Y_all, MPI.DOUBLE])
|
||||
self.mpi_comm.Allreduce([psi2_full, MPI.DOUBLE], [psi2_all, MPI.DOUBLE])
|
||||
self.mpi_comm.Allreduce([YRY_full, MPI.DOUBLE], [YRY_all, MPI.DOUBLE])
|
||||
return psi0_all, psi1Y_all, psi2_all, YRY_all
|
||||
|
||||
return psi0_full, psi1Y_full, psi2_full, YRY_full
|
||||
|
||||
def inference_likelihood(self, kern, X, Z, likelihood, Y):
|
||||
"""
|
||||
The first phase of inference:
|
||||
Compute: log-likelihood, dL_dKmm
|
||||
|
||||
Cached intermediate results: Kmm, KmmInv,
|
||||
"""
|
||||
|
||||
num_data, output_dim = Y.shape
|
||||
if self.mpi_comm != None:
|
||||
num_data_all = np.array(num_data,dtype=np.int32)
|
||||
self.mpi_comm.Allreduce([np.int32(num_data), MPI.INT], [num_data_all, MPI.INT])
|
||||
num_data = num_data_all
|
||||
|
||||
if isinstance(X, VariationalPosterior):
|
||||
uncertain_inputs = True
|
||||
else:
|
||||
uncertain_inputs = False
|
||||
|
||||
#see whether we've got a different noise variance for each datum
|
||||
beta = 1./np.fmax(likelihood.variance, 1e-6)
|
||||
het_noise = beta.size > 1
|
||||
if het_noise:
|
||||
self.batchsize = 1
|
||||
|
||||
psi0_full, psi1Y_full, psi2_full, YRY_full = self.gatherPsiStat(kern, X, Z, Y, beta, uncertain_inputs, het_noise)
|
||||
|
||||
#======================================================================
|
||||
# Compute Common Components
|
||||
#======================================================================
|
||||
|
|
@ -212,7 +239,6 @@ class VarDTC_minibatch(object):
|
|||
isEnd = False
|
||||
self.batch_pos = n_end
|
||||
|
||||
num_slice = n_end-n_start
|
||||
Y_slice = YYT_factor[n_start:n_end]
|
||||
X_slice = X[n_start:n_end]
|
||||
|
||||
|
|
@ -283,28 +309,32 @@ class VarDTC_minibatch(object):
|
|||
return isEnd, (n_start,n_end), grad_dict
|
||||
|
||||
|
||||
def update_gradients(model):
|
||||
model._log_marginal_likelihood, dL_dKmm, model.posterior = model.inference_method.inference_likelihood(model.kern, model.X, model.Z, model.likelihood, model.Y)
|
||||
def update_gradients(model, mpi_comm=None):
|
||||
if mpi_comm == None:
|
||||
Y = model.Y
|
||||
X = model.X
|
||||
else:
|
||||
Y = model.Y_local
|
||||
X = model.X_local
|
||||
|
||||
model._log_marginal_likelihood, dL_dKmm, model.posterior = model.inference_method.inference_likelihood(model.kern, X, model.Z, model.likelihood, Y)
|
||||
|
||||
het_noise = model.likelihood.variance.size > 1
|
||||
|
||||
if het_noise:
|
||||
dL_dthetaL = np.empty((model.Y.shape[0],))
|
||||
else:
|
||||
dL_dthetaL = 0
|
||||
|
||||
#gradients w.r.t. kernel
|
||||
model.kern.update_gradients_full(dL_dKmm, model.Z, None)
|
||||
dL_dthetaL = np.float64(0.)
|
||||
|
||||
kern_grad = model.kern.gradient.copy()
|
||||
|
||||
#gradients w.r.t. Z
|
||||
model.Z.gradient = model.kern.gradients_X(dL_dKmm, model.Z)
|
||||
kern_grad[:] = 0.
|
||||
model.Z.gradient = 0.
|
||||
|
||||
isEnd = False
|
||||
while not isEnd:
|
||||
isEnd, n_range, grad_dict = model.inference_method.inference_minibatch(model.kern, model.X, model.Z, model.likelihood, model.Y)
|
||||
isEnd, n_range, grad_dict = model.inference_method.inference_minibatch(model.kern, X, model.Z, model.likelihood, Y)
|
||||
if isinstance(model.X, VariationalPosterior):
|
||||
X_slice = model.X[n_range[0]:n_range[1]]
|
||||
X_slice = model.X[model.Y_range[0]+n_range[0]:model.Y_range[0]+n_range[1]]
|
||||
|
||||
#gradients w.r.t. kernel
|
||||
model.kern.update_gradients_expectations(variational_posterior=X_slice, Z=model.Z, dL_dpsi0=grad_dict['dL_dpsi0'], dL_dpsi1=grad_dict['dL_dpsi1'], dL_dpsi2=grad_dict['dL_dpsi2'])
|
||||
|
|
@ -322,14 +352,36 @@ def update_gradients(model):
|
|||
dL_dthetaL[n_range[0]:n_range[1]] = grad_dict['dL_dthetaL']
|
||||
else:
|
||||
dL_dthetaL += grad_dict['dL_dthetaL']
|
||||
|
||||
# Set the gradients w.r.t. kernel
|
||||
model.kern.gradient = kern_grad
|
||||
|
||||
# Update Log-likelihood
|
||||
model._log_marginal_likelihood -= model.variational_prior.KL_divergence(model.X)
|
||||
# update for the KL divergence
|
||||
model.variational_prior.update_gradients_KL(model.X)
|
||||
|
||||
# Gather the gradients from multiple MPI nodes
|
||||
if mpi_comm != None:
|
||||
if het_noise:
|
||||
assert False, "Not implemented!"
|
||||
kern_grad_all = kern_grad.copy()
|
||||
Z_grad_all = model.Z.gradient.copy()
|
||||
mpi_comm.Allreduce([kern_grad, MPI.DOUBLE], [kern_grad_all, MPI.DOUBLE])
|
||||
mpi_comm.Allreduce([model.Z.gradient, MPI.DOUBLE], [Z_grad_all, MPI.DOUBLE])
|
||||
kern_grad = kern_grad_all
|
||||
model.Z.gradient = Z_grad_all
|
||||
|
||||
#gradients w.r.t. kernel
|
||||
model.kern.update_gradients_full(dL_dKmm, model.Z, None)
|
||||
model.kern.gradient += kern_grad
|
||||
|
||||
#gradients w.r.t. Z
|
||||
model.Z.gradient += model.kern.gradients_X(dL_dKmm, model.Z)
|
||||
|
||||
# Update Log-likelihood
|
||||
KL_div = model.variational_prior.KL_divergence(X)
|
||||
# update for the KL divergence
|
||||
model.variational_prior.update_gradients_KL(X)
|
||||
|
||||
if mpi_comm != None:
|
||||
KL_div_all = np.array(KL_div)
|
||||
mpi_comm.Allreduce([np.float64(KL_div), MPI.DOUBLE], [KL_div_all, MPI.DOUBLE])
|
||||
KL_div = KL_div_all
|
||||
[mpi_comm.Allgatherv([pp.copy(), MPI.DOUBLE], [pa, (model.Y_list*pa.shape[-1], None), MPI.DOUBLE]) for pp,pa in zip(model.get_X_gradients(X),model.get_X_gradients(model.X))]
|
||||
model._log_marginal_likelihood -= KL_div
|
||||
|
||||
# dL_dthetaL
|
||||
model.likelihood.update_gradients(dL_dthetaL)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue