move _raw_predict into posterior object

This commit is contained in:
Zhenwen Dai 2016-03-17 11:29:33 +00:00
parent f2b813551a
commit 67ba9b60c6
4 changed files with 53 additions and 121 deletions

View file

@ -212,42 +212,9 @@ class GP(Model):
= N(f*| K_{x*x}(K_{xx} + \Sigma)^{-1}Y, K_{x*x*} - K_{xx*}(K_{xx} + \Sigma)^{-1}K_{xx*} = N(f*| K_{x*x}(K_{xx} + \Sigma)^{-1}Y, K_{x*x*} - K_{xx*}(K_{xx} + \Sigma)^{-1}K_{xx*}
\Sigma := \texttt{Likelihood.variance / Approximate likelihood covariance} \Sigma := \texttt{Likelihood.variance / Approximate likelihood covariance}
""" """
if hasattr(self.posterior, '_raw_predict'): mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew, pred_var=self._predictive_variable, full_cov=full_cov)
mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew, pred_var=self._predictive_variable, full_cov=full_cov)
if self.mean_function is not None:
mu += self.mean_function.f(Xnew)
return mu, var
if kern is None:
kern = self.kern
Kx = kern.K(self._predictive_variable, Xnew)
mu = np.dot(Kx.T, self.posterior.woodbury_vector)
if len(mu.shape)==1:
mu = mu.reshape(-1,1)
if full_cov:
Kxx = kern.K(Xnew)
if self.posterior.woodbury_inv.ndim == 2:
var = Kxx - np.dot(Kx.T, np.dot(self.posterior.woodbury_inv, Kx))
elif self.posterior.woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],Kxx.shape[1],self.posterior.woodbury_inv.shape[2]))
from ..util.linalg import mdot
for i in range(var.shape[2]):
var[:, :, i] = (Kxx - mdot(Kx.T, self.posterior.woodbury_inv[:, :, i], Kx))
var = var
else:
Kxx = kern.Kdiag(Xnew)
if self.posterior.woodbury_inv.ndim == 2:
var = (Kxx - np.sum(np.dot(self.posterior.woodbury_inv.T, Kx) * Kx, 0))[:,None]
elif self.posterior.woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],self.posterior.woodbury_inv.shape[2]))
for i in range(var.shape[1]):
var[:, i] = (Kxx - (np.sum(np.dot(self.posterior.woodbury_inv[:, :, i].T, Kx) * Kx, 0)))
var = var
#add in the mean function
if self.mean_function is not None: if self.mean_function is not None:
mu += self.mean_function.f(Xnew) mu += self.mean_function.f(Xnew)
return mu, var return mu, var
def predict(self, Xnew, full_cov=False, Y_metadata=None, kern=None, likelihood=None): def predict(self, Xnew, full_cov=False, Y_metadata=None, kern=None, likelihood=None):

View file

@ -113,90 +113,3 @@ class SparseGP(GP):
self.Z.gradient += self.kern.gradients_X(self.grad_dict['dL_dKnm'].T, self.Z, self.X) self.Z.gradient += self.kern.gradients_X(self.grad_dict['dL_dKnm'].T, self.Z, self.X)
self._Zgrad = self.Z.gradient.copy() self._Zgrad = self.Z.gradient.copy()
def _raw_predict(self, Xnew, full_cov=False, kern=None):
"""
Make a prediction for the latent function values.
For certain inputs we give back a full_cov of shape NxN,
if there is missing data, each dimension has its own full_cov of shape NxNxD, and if full_cov is of,
we take only the diagonal elements across N.
For uncertain inputs, the SparseGP bound produces cannot predict the full covariance matrix full_cov for now.
The implementation of that will follow. However, for each dimension the
covariance changes, so if full_cov is False (standard), we return the variance
for each dimension [NxD].
"""
if hasattr(self.posterior, '_raw_predict'):
mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew, pred_var=self._predictive_variable, full_cov=full_cov)
if self.mean_function is not None:
mu += self.mean_function.f(Xnew)
return mu, var
if kern is None: kern = self.kern
if not isinstance(Xnew, VariationalPosterior):
# Kx = kern.K(self._predictive_variable, Xnew)
# mu = np.dot(Kx.T, self.posterior.woodbury_vector)
# if full_cov:
# Kxx = kern.K(Xnew)
# if self.posterior.woodbury_inv.ndim == 2:
# var = Kxx - np.dot(Kx.T, np.dot(self.posterior.woodbury_inv, Kx))
# elif self.posterior.woodbury_inv.ndim == 3:
# var = np.empty((Kxx.shape[0],Kxx.shape[1],self.posterior.woodbury_inv.shape[2]))
# for i in range(var.shape[2]):
# var[:, :, i] = (Kxx - mdot(Kx.T, self.posterior.woodbury_inv[:, :, i], Kx))
# var = var
# else:
# Kxx = kern.Kdiag(Xnew)
# if self.posterior.woodbury_inv.ndim == 2:
# var = (Kxx - np.sum(np.dot(self.posterior.woodbury_inv.T, Kx) * Kx, 0))[:,None]
# elif self.posterior.woodbury_inv.ndim == 3:
# var = np.empty((Kxx.shape[0],self.posterior.woodbury_inv.shape[2]))
# for i in range(var.shape[1]):
# var[:, i] = (Kxx - (np.sum(np.dot(self.posterior.woodbury_inv[:, :, i].T, Kx) * Kx, 0)))
# var = var
# #add in the mean function
# if self.mean_function is not None:
# mu += self.mean_function.f(Xnew)
mu, var = super(SparseGP, self)._raw_predict(Xnew, full_cov, kern)
else:
psi0_star = kern.psi0(self._predictive_variable, Xnew)
psi1_star = kern.psi1(self._predictive_variable, Xnew)
psi2_star = kern.psi2n(self._predictive_variable, Xnew)
la = self.posterior.woodbury_vector
mu = np.dot(psi1_star, la) # TODO: dimensions?
N,M,D = psi0_star.shape[0],psi1_star.shape[1], la.shape[1]
if full_cov:
raise NotImplementedError("Full covariance for Sparse GP predicted with uncertain inputs not implemented yet.")
var = np.zeros((Xnew.shape[0], la.shape[1], la.shape[1]))
di = np.diag_indices(la.shape[1])
else:
tmp = psi2_star - psi1_star[:,:,None]*psi1_star[:,None,:]
var = (tmp.reshape(-1,M).dot(la).reshape(N,M,D)*la[None,:,:]).sum(1) + psi0_star[:,None]
if self.posterior.woodbury_inv.ndim==2:
var += -psi2_star.reshape(N,-1).dot(self.posterior.woodbury_inv.flat)[:,None]
else:
var += -psi2_star.reshape(N,-1).dot(self.posterior.woodbury_inv.reshape(-1,D))
assert np.all(var>=-1e-5), "The predicted variance goes negative!: "+str(var)
var = np.clip(var,1e-15,np.inf)
# for i in range(Xnew.shape[0]):
# _mu, _var = Xnew.mean.values[[i]], Xnew.variance.values[[i]]
# psi2_star = kern.psi2(self._predictive_variable, NormalPosterior(_mu, _var))
# tmp = (psi2_star[:, :] - psi1_star[[i]].T.dot(psi1_star[[i]]))
#
# var_ = mdot(la.T, tmp, la)
# p0 = psi0_star[i]
# t = np.atleast_3d(self.posterior.woodbury_inv)
# t2 = np.trace(t.T.dot(psi2_star), axis1=1, axis2=2)
#
# if full_cov:
# var_[di] += p0
# var_[di] += -t2
# var[i] = var_
# else:
# var[i] = np.diag(var_)+p0-t2
return mu, var

View file

@ -3,6 +3,7 @@
import numpy as np import numpy as np
from ...util.linalg import pdinv, dpotrs, dpotri, symmetrify, jitchol, dtrtrs, tdot from ...util.linalg import pdinv, dpotrs, dpotri, symmetrify, jitchol, dtrtrs, tdot
from GPy.core.parameterization.variational import VariationalPosterior
class Posterior(object): class Posterior(object):
""" """
@ -188,6 +189,56 @@ class Posterior(object):
self._K_chol = jitchol(self._K) self._K_chol = jitchol(self._K)
return self._K_chol return self._K_chol
def _raw_predict(self, kern, Xnew, pred_var, full_cov=False):
woodbury_vector = self.woodbury_vector
woodbury_inv = self.woodbury_inv
if not isinstance(Xnew, VariationalPosterior):
Kx = kern.K(pred_var, Xnew)
mu = np.dot(Kx.T, woodbury_vector)
if len(mu.shape)==1:
mu = mu.reshape(-1,1)
if full_cov:
Kxx = kern.K(Xnew)
if woodbury_inv.ndim == 2:
var = Kxx - np.dot(Kx.T, np.dot(woodbury_inv, Kx))
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],Kxx.shape[1],woodbury_inv.shape[2]))
from ...util.linalg import mdot
for i in range(var.shape[2]):
var[:, :, i] = (Kxx - mdot(Kx.T, woodbury_inv[:, :, i], Kx))
var = var
else:
Kxx = kern.Kdiag(Xnew)
if woodbury_inv.ndim == 2:
var = (Kxx - np.sum(np.dot(woodbury_inv.T, Kx) * Kx, 0))[:,None]
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],woodbury_inv.shape[2]))
for i in range(var.shape[1]):
var[:, i] = (Kxx - (np.sum(np.dot(woodbury_inv[:, :, i].T, Kx) * Kx, 0)))
var = var
else:
psi0_star = kern.psi0(pred_var, Xnew)
psi1_star = kern.psi1(pred_var, Xnew)
psi2_star = kern.psi2n(pred_var, Xnew)
la = woodbury_vector
mu = np.dot(psi1_star, la) # TODO: dimensions?
N,M,D = psi0_star.shape[0],psi1_star.shape[1], la.shape[1]
if full_cov:
raise NotImplementedError("Full covariance for Sparse GP predicted with uncertain inputs not implemented yet.")
var = np.zeros((Xnew.shape[0], la.shape[1], la.shape[1]))
di = np.diag_indices(la.shape[1])
else:
tmp = psi2_star - psi1_star[:,:,None]*psi1_star[:,None,:]
var = (tmp.reshape(-1,M).dot(la).reshape(N,M,D)*la[None,:,:]).sum(1) + psi0_star[:,None]
if woodbury_inv.ndim==2:
var += -psi2_star.reshape(N,-1).dot(woodbury_inv.flat)[:,None]
else:
var += -psi2_star.reshape(N,-1).dot(woodbury_inv.reshape(-1,D))
var = np.clip(var,1e-15,np.inf)
return mu, var
class PosteriorExact(Posterior): class PosteriorExact(Posterior):
def _raw_predict(self, kern, Xnew, pred_var, full_cov=False): def _raw_predict(self, kern, Xnew, pred_var, full_cov=False):

View file

@ -157,6 +157,7 @@ class MiscTests(unittest.TestCase):
# Not easy to check if woodbury_inv is correct in itself as it requires a large derivation and expression # Not easy to check if woodbury_inv is correct in itself as it requires a large derivation and expression
Kinv = m.posterior.woodbury_inv Kinv = m.posterior.woodbury_inv
K_hat = k.K(self.X_new) - k.K(self.X_new, Z).dot(Kinv).dot(k.K(Z, self.X_new)) K_hat = k.K(self.X_new) - k.K(self.X_new, Z).dot(Kinv).dot(k.K(Z, self.X_new))
K_hat = np.clip(K_hat, 1e-15, np.inf)
mu, covar = m._raw_predict(self.X_new, full_cov=True) mu, covar = m._raw_predict(self.X_new, full_cov=True)
self.assertEquals(mu.shape, (self.N_new, self.D)) self.assertEquals(mu.shape, (self.N_new, self.D))