fix: merge

This commit is contained in:
mzwiessele 2018-09-02 21:22:04 +01:00
commit 717d3f56f2
8 changed files with 624 additions and 64 deletions

View file

@ -94,6 +94,7 @@ class InferenceMethodList(LatentFunctionInference, list):
self.append(inf)
from .exact_gaussian_inference import ExactGaussianInference
from .exact_studentt_inference import ExactStudentTInference
from .laplace import Laplace,LaplaceBlock
from GPy.inference.latent_function_inference.var_dtc import VarDTC
from .expectation_propagation import EP, EPDTC

View file

@ -0,0 +1,49 @@
# Copyright (c) 2017, GPy authors (see AUTHORS.txt).
# Licensed under the BSD 3-clause license (see LICENSE.txt)
from . import LatentFunctionInference
from .posterior import StudentTPosterior
from ...util.linalg import pdinv, dpotrs, tdot
from ...util import diag
import numpy as np
from scipy.special import gammaln, digamma
class ExactStudentTInference(LatentFunctionInference):
"""
An object for inference of student-t processes (not for GP with student-t likelihood!).
The function self.inference returns a StudentTPosterior object, which summarizes
the posterior.
"""
def inference(self, kern, X, Y, nu, mean_function=None, K=None):
m = 0 if mean_function is None else mean_function.f(X)
K = kern.K(X) if K is None else K
YYT_factor = Y - m
Ky = K.copy()
diag.add(Ky, 1e-8)
# Posterior representation
Wi, LW, LWi, W_logdet = pdinv(Ky)
alpha, _ = dpotrs(LW, YYT_factor, lower=1)
beta = np.sum(alpha * YYT_factor)
posterior = StudentTPosterior(nu, woodbury_chol=LW, woodbury_vector=alpha, K=K)
# Log marginal
N = Y.shape[0]
D = Y.shape[1]
log_marginal = 0.5 * (-N * np.log((nu - 2) * np.pi) - W_logdet - (nu + N) * np.log(1 + beta / (nu - 2)))
log_marginal += gammaln((nu + N) / 2) - gammaln(nu / 2)
# Gradients
dL_dK = 0.5 * ((nu + N) / (nu + beta - 2) * tdot(alpha) - D * Wi)
dL_dnu = -N / (nu - 2.) + digamma(0.5 * (nu + N)) - digamma(0.5 * nu)
dL_dnu -= np.log(1 + beta / (nu - 2.))
dL_dnu += ((nu + N) * beta) / ((nu - 2) * (beta + nu - 2))
dL_dnu *= 0.5
gradients = {'dL_dK': dL_dK, 'dL_dnu': dL_dnu, 'dL_dm': alpha}
return posterior, log_marginal, gradients

View file

@ -5,6 +5,7 @@ import numpy as np
from ...util.linalg import pdinv, dpotrs, dpotri, symmetrify, jitchol, dtrtrs, tdot
from GPy.core.parameterization.variational import VariationalPosterior
class Posterior(object):
"""
An object to represent a Gaussian posterior over latent function values, p(f|D).
@ -16,7 +17,9 @@ class Posterior(object):
the function at any new point x_* by integrating over this posterior.
"""
def __init__(self, woodbury_chol=None, woodbury_vector=None, K=None, mean=None, cov=None, K_chol=None, woodbury_inv=None, prior_mean=0):
def __init__(self, woodbury_chol=None, woodbury_vector=None, K=None, mean=None, cov=None, K_chol=None,
woodbury_inv=None, prior_mean=0):
"""
woodbury_chol : a lower triangular matrix L that satisfies posterior_covariance = K - K L^{-T} L^{-1} K
woodbury_vector : a matrix (or vector, as Nx1 matrix) M which satisfies posterior_mean = K M
@ -44,33 +47,33 @@ class Posterior(object):
compute all other quantites on demand.
"""
#obligatory
# obligatory
self._K = K
if ((woodbury_chol is not None) and (woodbury_vector is not None))\
or ((woodbury_inv is not None) and (woodbury_vector is not None))\
or ((woodbury_inv is not None) and (mean is not None))\
if ((woodbury_chol is not None) and (woodbury_vector is not None)) \
or ((woodbury_inv is not None) and (woodbury_vector is not None)) \
or ((woodbury_inv is not None) and (mean is not None)) \
or ((mean is not None) and (cov is not None)):
pass # we have sufficient to compute the posterior
pass # we have sufficient to compute the posterior
else:
raise ValueError("insufficient information to compute the posterior")
self._K_chol = K_chol
self._K = K
#option 1:
# option 1:
self._woodbury_chol = woodbury_chol
self._woodbury_vector = woodbury_vector
#option 2.
# option 2.
self._woodbury_inv = woodbury_inv
#and woodbury vector
# and woodbury vector
#option 2:
# option 2:
self._mean = mean
self._covariance = cov
self._prior_mean = prior_mean
#compute this lazily
# compute this lazily
self._precision = None
@property
@ -96,9 +99,11 @@ class Posterior(object):
$$
"""
if self._covariance is None:
#LiK, _ = dtrtrs(self.woodbury_chol, self._K, lower=1)
self._covariance = (np.atleast_3d(self._K) - np.tensordot(np.dot(np.atleast_3d(self.woodbury_inv).T, self._K), self._K, [1,0]).T).squeeze()
#self._covariance = self._K - self._K.dot(self.woodbury_inv).dot(self._K)
# LiK, _ = dtrtrs(self.woodbury_chol, self._K, lower=1)
self._covariance = (
np.atleast_3d(self._K) - np.tensordot(np.dot(np.atleast_3d(self.woodbury_inv).T, self._K), self._K,
[1, 0]).T).squeeze()
# self._covariance = self._K - self._K.dot(self.woodbury_inv).dot(self._K)
return self._covariance
def covariance_between_points(self, kern, X, X1, X2):
@ -131,9 +136,9 @@ class Posterior(object):
"""
if self._precision is None:
cov = np.atleast_3d(self.covariance)
self._precision = np.zeros(cov.shape) # if one covariance per dimension
self._precision = np.zeros(cov.shape) # if one covariance per dimension
for p in range(cov.shape[-1]):
self._precision[:,:,p] = pdinv(cov[:,:,p])[0]
self._precision[:, :, p] = pdinv(cov[:, :, p])[0]
return self._precision
@property
@ -146,18 +151,18 @@ class Posterior(object):
$$
"""
if self._woodbury_chol is None:
#compute woodbury chol from
# compute woodbury chol from
if self._woodbury_inv is not None:
winv = np.atleast_3d(self._woodbury_inv)
self._woodbury_chol = np.zeros(winv.shape)
for p in range(winv.shape[-1]):
self._woodbury_chol[:,:,p] = pdinv(winv[:,:,p])[2]
#Li = jitchol(self._woodbury_inv)
#self._woodbury_chol, _ = dtrtri(Li)
#W, _, _, _, = pdinv(self._woodbury_inv)
#symmetrify(W)
#self._woodbury_chol = jitchol(W)
#try computing woodbury chol from cov
self._woodbury_chol[:, :, p] = pdinv(winv[:, :, p])[2]
# Li = jitchol(self._woodbury_inv)
# self._woodbury_chol, _ = dtrtri(Li)
# W, _, _, _, = pdinv(self._woodbury_inv)
# symmetrify(W)
# self._woodbury_chol = jitchol(W)
# try computing woodbury chol from cov
elif self._covariance is not None:
raise NotImplementedError("TODO: check code here")
B = self._K - self._covariance
@ -180,14 +185,14 @@ class Posterior(object):
if self._woodbury_inv is None:
if self._woodbury_chol is not None:
self._woodbury_inv, _ = dpotri(self._woodbury_chol, lower=1)
#self._woodbury_inv, _ = dpotrs(self.woodbury_chol, np.eye(self.woodbury_chol.shape[0]), lower=1)
# self._woodbury_inv, _ = dpotrs(self.woodbury_chol, np.eye(self.woodbury_chol.shape[0]), lower=1)
symmetrify(self._woodbury_inv)
elif self._covariance is not None:
B = np.atleast_3d(self._K) - np.atleast_3d(self._covariance)
self._woodbury_inv = np.empty_like(B)
for i in range(B.shape[-1]):
tmp, _ = dpotrs(self.K_chol, B[:,:,i])
self._woodbury_inv[:,:,i], _ = dpotrs(self.K_chol, tmp.T)
tmp, _ = dpotrs(self.K_chol, B[:, :, i])
self._woodbury_inv[:, :, i], _ = dpotrs(self.K_chol, tmp.T)
return self._woodbury_inv
@property
@ -219,14 +224,14 @@ class Posterior(object):
if not isinstance(Xnew, VariationalPosterior):
Kx = kern.K(pred_var, Xnew)
mu = np.dot(Kx.T, woodbury_vector)
if len(mu.shape)==1:
mu = mu.reshape(-1,1)
if len(mu.shape) == 1:
mu = mu.reshape(-1, 1)
if full_cov:
Kxx = kern.K(Xnew)
if woodbury_inv.ndim == 2:
var = Kxx - np.dot(Kx.T, np.dot(woodbury_inv, Kx))
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],Kxx.shape[1],woodbury_inv.shape[2]))
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], Kxx.shape[1], woodbury_inv.shape[2]))
from ...util.linalg import mdot
for i in range(var.shape[2]):
var[:, :, i] = (Kxx - mdot(Kx.T, woodbury_inv[:, :, i], Kx))
@ -234,9 +239,9 @@ class Posterior(object):
else:
Kxx = kern.Kdiag(Xnew)
if woodbury_inv.ndim == 2:
var = (Kxx - np.sum(np.dot(woodbury_inv.T, Kx) * Kx, 0))[:,None]
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],woodbury_inv.shape[2]))
var = (Kxx - np.sum(np.dot(woodbury_inv.T, Kx) * Kx, 0))[:, None]
elif woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], woodbury_inv.shape[2]))
for i in range(var.shape[1]):
var[:, i] = (Kxx - (np.sum(np.dot(woodbury_inv[:, :, i].T, Kx) * Kx, 0)))
var = var
@ -245,86 +250,99 @@ class Posterior(object):
psi1_star = kern.psi1(pred_var, Xnew)
psi2_star = kern.psi2n(pred_var, Xnew)
la = woodbury_vector
mu = np.dot(psi1_star, la) # TODO: dimensions?
N,M,D = psi0_star.shape[0],psi1_star.shape[1], la.shape[1]
mu = np.dot(psi1_star, la) # TODO: dimensions?
N, M, D = psi0_star.shape[0], psi1_star.shape[1], la.shape[1]
if full_cov:
raise NotImplementedError("Full covariance for Sparse GP predicted with uncertain inputs not implemented yet.")
raise NotImplementedError(
"Full covariance for Sparse GP predicted with uncertain inputs not implemented yet.")
var = np.zeros((Xnew.shape[0], la.shape[1], la.shape[1]))
di = np.diag_indices(la.shape[1])
else:
tmp = psi2_star - psi1_star[:,:,None]*psi1_star[:,None,:]
var = (tmp.reshape(-1,M).dot(la).reshape(N,M,D)*la[None,:,:]).sum(1) + psi0_star[:,None]
if woodbury_inv.ndim==2:
var += -psi2_star.reshape(N,-1).dot(woodbury_inv.flat)[:,None]
tmp = psi2_star - psi1_star[:, :, None] * psi1_star[:, None, :]
var = (tmp.reshape(-1, M).dot(la).reshape(N, M, D) * la[None, :, :]).sum(1) + psi0_star[:, None]
if woodbury_inv.ndim == 2:
var += -psi2_star.reshape(N, -1).dot(woodbury_inv.flat)[:, None]
else:
var += -psi2_star.reshape(N,-1).dot(woodbury_inv.reshape(-1,D))
var = np.clip(var,1e-15,np.inf)
var += -psi2_star.reshape(N, -1).dot(woodbury_inv.reshape(-1, D))
var = np.clip(var, 1e-15, np.inf)
return mu, var
class PosteriorExact(Posterior):
def _raw_predict(self, kern, Xnew, pred_var, full_cov=False):
Kx = kern.K(pred_var, Xnew)
mu = np.dot(Kx.T, self.woodbury_vector)
if len(mu.shape)==1:
mu = mu.reshape(-1,1)
if len(mu.shape) == 1:
mu = mu.reshape(-1, 1)
if full_cov:
Kxx = kern.K(Xnew)
if self._woodbury_chol.ndim == 2:
tmp = dtrtrs(self._woodbury_chol, Kx)[0]
var = Kxx - tdot(tmp.T)
elif self._woodbury_chol.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],Kxx.shape[1],self._woodbury_chol.shape[2]))
elif self._woodbury_chol.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], Kxx.shape[1], self._woodbury_chol.shape[2]))
for i in range(var.shape[2]):
tmp = dtrtrs(self._woodbury_chol[:,:,i], Kx)[0]
tmp = dtrtrs(self._woodbury_chol[:, :, i], Kx)[0]
var[:, :, i] = (Kxx - tdot(tmp.T))
var = var
else:
Kxx = kern.Kdiag(Xnew)
if self._woodbury_chol.ndim == 2:
tmp = dtrtrs(self._woodbury_chol, Kx)[0]
var = (Kxx - np.square(tmp).sum(0))[:,None]
elif self._woodbury_chol.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],self._woodbury_chol.shape[2]))
var = (Kxx - np.square(tmp).sum(0))[:, None]
elif self._woodbury_chol.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], self._woodbury_chol.shape[2]))
for i in range(var.shape[1]):
tmp = dtrtrs(self._woodbury_chol[:,:,i], Kx)[0]
tmp = dtrtrs(self._woodbury_chol[:, :, i], Kx)[0]
var[:, i] = (Kxx - np.square(tmp).sum(0))
var = var
return mu, var
class PosteriorEP(Posterior):
class PosteriorEP(Posterior):
def _raw_predict(self, kern, Xnew, pred_var, full_cov=False):
Kx = kern.K(pred_var, Xnew)
mu = np.dot(Kx.T, self.woodbury_vector)
if len(mu.shape)==1:
mu = mu.reshape(-1,1)
if len(mu.shape) == 1:
mu = mu.reshape(-1, 1)
if full_cov:
Kxx = kern.K(Xnew)
if self._woodbury_inv.ndim == 2:
tmp = np.dot(Kx.T,np.dot(self._woodbury_inv, Kx))
tmp = np.dot(Kx.T, np.dot(self._woodbury_inv, Kx))
var = Kxx - tmp
elif self._woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],Kxx.shape[1],self._woodbury_inv.shape[2]))
elif self._woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], Kxx.shape[1], self._woodbury_inv.shape[2]))
for i in range(var.shape[2]):
tmp = np.dot(Kx.T,np.dot(self._woodbury_inv[:,:,i], Kx))
tmp = np.dot(Kx.T, np.dot(self._woodbury_inv[:, :, i], Kx))
var[:, :, i] = (Kxx - tmp)
var = var
else:
Kxx = kern.Kdiag(Xnew)
if self._woodbury_inv.ndim == 2:
tmp = (np.dot(self._woodbury_inv, Kx) * Kx).sum(0)
var = (Kxx - tmp)[:,None]
elif self._woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0],self._woodbury_inv.shape[2]))
var = (Kxx - tmp)[:, None]
elif self._woodbury_inv.ndim == 3: # Missing data
var = np.empty((Kxx.shape[0], self._woodbury_inv.shape[2]))
for i in range(var.shape[1]):
tmp = (Kx * np.dot(self._woodbury_inv[:,:,i], Kx)).sum(0)
tmp = (Kx * np.dot(self._woodbury_inv[:, :, i], Kx)).sum(0)
var[:, i] = (Kxx - tmp)
var = var
return mu, var
class StudentTPosterior(PosteriorExact):
def __init__(self, deg_free, **kwargs):
super(StudentTPosterior, self).__init__(**kwargs)
self.nu = deg_free
def _raw_predict(self, kern, Xnew, pred_var, full_cov=False):
mu, var = super(StudentTPosterior, self)._raw_predict(kern, Xnew, pred_var, full_cov)
beta = np.sum(self.woodbury_vector * self.mean)
N = self.woodbury_vector.shape[0]
tp_var_scale = (self.nu + beta - 2) / (self.nu + N - 2)
return mu, tp_var_scale * var

View file

@ -29,3 +29,4 @@ from .gp_offset_regression import GPOffsetRegression
from .gp_grid_regression import GPRegressionGrid
from .gp_multiout_regression import GPMultioutRegression
from .gp_multiout_regression_md import GPMultioutRegressionMD
from .tp_regression import TPRegression

294
GPy/models/tp_regression.py Normal file
View file

@ -0,0 +1,294 @@
# Copyright (c) 2017 the GPy Austhors (see AUTHORS.txt)
# Licensed under the BSD 3-clause license (see LICENSE.txt)
from ..core import Model
from ..core.parameterization import Param
from ..core import Mapping
from ..kern import Kern, RBF
from ..inference.latent_function_inference import ExactStudentTInference
from ..util.normalizer import Standardize
import numpy as np
from scipy import stats
from paramz import ObsAr
from paramz.transformations import Logistic, Logexp, LogexpClipped
import warnings
class TPRegression(Model):
"""
Student-t Process model for regression, as presented in
Shah, A., Wilson, A. and Ghahramani, Z., 2014, April. Student-t processes as alternatives to Gaussian processes.
In Artificial Intelligence and Statistics (pp. 877-885).
:param X: input observations
:param Y: observed values
:param kernel: a GPy kernel, defaults to rbf
:param deg_free: initial value for the degrees of freedom hyperparameter
:param Norm normalizer: [False]
Normalize Y with the norm given.
If normalizer is False, no normalization will be done
If it is None, we use GaussianNorm(alization)
.. Note:: Multiple independent outputs are allowed using columns of Y
"""
def __init__(self, X, Y, kernel=None, deg_free=5., normalizer=None, mean_function=None, name='TP regression'):
super(TPRegression, self).__init__(name=name)
# X
assert X.ndim == 2
self.set_X(X)
self.num_data, self.input_dim = self.X.shape
# Y
assert Y.ndim == 2
if normalizer is True:
self.normalizer = Standardize()
elif normalizer is False:
self.normalizer = None
else:
self.normalizer = normalizer
self.set_Y(Y)
if Y.shape[0] != self.num_data:
# There can be cases where we want inputs than outputs, for example if we have multiple latent
# function values
warnings.warn("There are more rows in your input data X, \
than in your output data Y, be VERY sure this is what you want")
self.output_dim = self.Y.shape[1]
# Kernel
kernel = kernel or RBF(self.X.shape[1])
assert isinstance(kernel, Kern)
self.kern = kernel
self.link_parameter(self.kern)
if self.kern._effective_input_dim != self.X.shape[1]:
warnings.warn(
"Your kernel has a different input dimension {} then the given X dimension {}. Be very sure this is "
"what you want and you have not forgotten to set the right input dimenion in your kernel".format(
self.kern._effective_input_dim, self.X.shape[1]))
# Mean function
self.mean_function = mean_function
if mean_function is not None:
assert isinstance(self.mean_function, Mapping)
assert mean_function.input_dim == self.input_dim
assert mean_function.output_dim == self.output_dim
self.link_parameter(mean_function)
# Degrees of freedom
self.nu = Param('deg_free', float(deg_free), Logexp())
self.link_parameter(self.nu)
# Inference
self.inference_method = ExactStudentTInference()
self.posterior = None
self._log_marginal_likelihood = None
# Insert property for plotting (not used)
self.Y_metadata = None
def _update_posterior_dof(self, dof, which):
if self.posterior is not None:
self.posterior.nu = dof
@property
def _predictive_variable(self):
return self.X
def set_XY(self, X, Y):
"""
Set the input / output data of the model
This is useful if we wish to change our existing data but maintain the same model
:param X: input observations
:type X: np.ndarray
:param Y: output observations
:type Y: np.ndarray or ObsAr
"""
self.update_model(False)
self.set_Y(Y)
self.set_X(X)
self.update_model(True)
def set_X(self, X):
"""
Set the input data of the model
:param X: input observations
:type X: np.ndarray
"""
assert isinstance(X, np.ndarray)
state = self.update_model()
self.update_model(False)
self.X = ObsAr(X)
self.update_model(state)
def set_Y(self, Y):
"""
Set the output data of the model
:param Y: output observations
:type Y: np.ndarray or ObsArray
"""
assert isinstance(Y, (np.ndarray, ObsAr))
state = self.update_model()
self.update_model(False)
if self.normalizer is not None:
self.normalizer.scale_by(Y)
self.Y_normalized = ObsAr(self.normalizer.normalize(Y))
self.Y = Y
else:
self.Y = ObsAr(Y) if isinstance(Y, np.ndarray) else Y
self.Y_normalized = self.Y
self.update_model(state)
def parameters_changed(self):
"""
Method that is called upon any changes to :class:`~GPy.core.parameterization.param.Param` variables within the model.
In particular in this class this method re-performs inference, recalculating the posterior, log marginal likelihood and gradients of the model
.. warning::
This method is not designed to be called manually, the framework is set up to automatically call this method upon changes to parameters, if you call
this method yourself, there may be unexpected consequences.
"""
self.posterior, self._log_marginal_likelihood, grad_dict = self.inference_method.inference(self.kern,
self.X,
self.Y_normalized,
self.nu + 2 + np.finfo(
float).eps,
self.mean_function)
self.kern.update_gradients_full(grad_dict['dL_dK'], self.X)
if self.mean_function is not None:
self.mean_function.update_gradients(grad_dict['dL_dm'], self.X)
self.nu.gradient = grad_dict['dL_dnu']
def log_likelihood(self):
"""
The log marginal likelihood of the model, :math:`p(\mathbf{y})`, this is the objective function of the model being optimised
"""
return self._log_marginal_likelihood or self.inference()[1]
def _raw_predict(self, Xnew, full_cov=False, kern=None):
"""
For making predictions, does not account for normalization or likelihood
full_cov is a boolean which defines whether the full covariance matrix
of the prediction is computed. If full_cov is False (default), only the
diagonal of the covariance is returned.
.. math::
p(f*|X*, X, Y) = \int^{\inf}_{\inf} p(f*|f,X*)p(f|X,Y) df
= MVN\left(\nu + N,f*| K_{x*x}(K_{xx})^{-1}Y,
\frac{\nu + \beta - 2}{\nu + N - 2}K_{x*x*} - K_{xx*}(K_{xx})^{-1}K_{xx*}\right)
\nu := \texttt{Degrees of freedom}
"""
mu, var = self.posterior._raw_predict(kern=self.kern if kern is None else kern, Xnew=Xnew,
pred_var=self._predictive_variable, full_cov=full_cov)
if self.mean_function is not None:
mu += self.mean_function.f(Xnew)
return mu, var
def predict(self, Xnew, full_cov=False, kern=None, **kwargs):
"""
Predict the function(s) at the new point(s) Xnew. For Student-t processes, this method is equivalent to
predict_noiseless as no likelihood is included in the model.
"""
return self.predict_noiseless(Xnew, full_cov=full_cov, kern=kern)
def predict_noiseless(self, Xnew, full_cov=False, kern=None):
"""
Predict the underlying function f at the new point(s) Xnew.
:param Xnew: The points at which to make a prediction
:type Xnew: np.ndarray (Nnew x self.input_dim)
:param full_cov: whether to return the full covariance matrix, or just the diagonal
:type full_cov: bool
:param kern: The kernel to use for prediction (defaults to the model kern).
:returns: (mean, var):
mean: posterior mean, a Numpy array, Nnew x self.input_dim
var: posterior variance, a Numpy array, Nnew x 1 if full_cov=False, Nnew x Nnew otherwise
If full_cov and self.input_dim > 1, the return shape of var is Nnew x Nnew x self.input_dim.
If self.input_dim == 1, the return shape is Nnew x Nnew.
This is to allow for different normalizations of the output dimensions.
"""
# Predict the latent function values
mu, var = self._raw_predict(Xnew, full_cov=full_cov, kern=kern)
# Un-apply normalization
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
return mu, var
def predict_quantiles(self, X, quantiles=(2.5, 97.5), kern=None, **kwargs):
"""
Get the predictive quantiles around the prediction at X
:param X: The points at which to make a prediction
:type X: np.ndarray (Xnew x self.input_dim)
:param quantiles: tuple of quantiles, default is (2.5, 97.5) which is the 95% interval
:type quantiles: tuple
:param kern: optional kernel to use for prediction
:type predict_kw: dict
:returns: list of quantiles for each X and predictive quantiles for interval combination
:rtype: [np.ndarray (Xnew x self.output_dim), np.ndarray (Xnew x self.output_dim)]
"""
mu, var = self._raw_predict(X, full_cov=False, kern=kern)
quantiles = [stats.t.ppf(q / 100., self.nu + 2 + self.num_data) * np.sqrt(var) + mu for q in quantiles]
if self.normalizer is not None:
quantiles = [self.normalizer.inverse_mean(q) for q in quantiles]
return quantiles
def posterior_samples(self, X, size=10, full_cov=False, Y_metadata=None, likelihood=None, **predict_kwargs):
"""
Samples the posterior GP at the points X, equivalent to posterior_samples_f due to the absence of a likelihood.
"""
return self.posterior_samples_f(X, size, full_cov=full_cov, **predict_kwargs)
def posterior_samples_f(self, X, size=10, full_cov=True, **predict_kwargs):
"""
Samples the posterior TP at the points X.
:param X: The points at which to take the samples.
:type X: np.ndarray (Nnew x self.input_dim)
:param size: the number of a posteriori samples.
:type size: int.
:param full_cov: whether to return the full covariance matrix, or just the diagonal.
:type full_cov: bool.
:returns: fsim: set of simulations
:rtype: np.ndarray (D x N x samples) (if D==1 we flatten out the first dimension)
"""
mu, var = self._raw_predict(X, full_cov=full_cov, **predict_kwargs)
if self.normalizer is not None:
mu, var = self.normalizer.inverse_mean(mu), self.normalizer.inverse_variance(var)
def sim_one_dim(m, v):
nu = self.nu + 2 + self.num_data
v = np.diag(v.flatten()) if not full_cov else v
Z = np.random.multivariate_normal(np.zeros(X.shape[0]), v, size).T
g = np.tile(np.random.gamma(nu / 2., 2. / nu, size), (X.shape[0], 1))
return m + Z / np.sqrt(g)
if self.output_dim == 1:
return sim_one_dim(mu, var)
else:
fsim = np.empty((self.output_dim, self.num_data, size))
for d in range(self.output_dim):
if full_cov and var.ndim == 3:
fsim[d] = sim_one_dim(mu[:, d], var[:, :, d])
elif (not full_cov) and var.ndim == 2:
fsim[d] = sim_one_dim(mu[:, d], var[:, d])
else:
fsim[d] = sim_one_dim(mu[:, d], var)
return fsim

View file

@ -92,6 +92,19 @@ def inject_plotting():
SSGPLVM.plot_inducing = gpy_plot.latent_plots.plot_latent_inducing
SSGPLVM.plot_steepest_gradient_map = gpy_plot.latent_plots.plot_steepest_gradient_map
from ..models import TPRegression
TPRegression.plot_data = gpy_plot.data_plots.plot_data
TPRegression.plot = gpy_plot.gp_plots.plot
TPRegression.plot_data_error = gpy_plot.data_plots.plot_data_error
TPRegression.plot_errorbars_trainset = gpy_plot.data_plots.plot_errorbars_trainset
TPRegression.plot_mean = gpy_plot.gp_plots.plot_mean
TPRegression.plot_confidence = gpy_plot.gp_plots.plot_confidence
TPRegression.plot_density = gpy_plot.gp_plots.plot_density
TPRegression.plot_samples = gpy_plot.gp_plots.plot_samples
TPRegression.plot_f = gpy_plot.gp_plots.plot_f
TPRegression.plot_latent = gpy_plot.gp_plots.plot_f
TPRegression.plot_noiseless = gpy_plot.gp_plots.plot_f
from ..kern import Kern
Kern.plot_covariance = gpy_plot.kernel_plots.plot_covariance
def deprecate_plot(self, *args, **kwargs):

View file

@ -812,6 +812,45 @@ class GradientTests(np.testing.TestCase):
rbflin = GPy.kern.RBF(1) + GPy.kern.White(1)
self.check_model(rbflin, model_type='SparseGPRegression', dimension=1, uncertain_inputs=1)
def test_TPRegression_matern52_1D(self):
''' Testing the TP regression with matern52 kernel on 1d data '''
matern52 = GPy.kern.Matern52(1) + GPy.kern.White(1)
self.check_model(matern52, model_type='TPRegression', dimension=1)
def test_TPRegression_rbf_2D(self):
''' Testing the TP regression with rbf kernel on 2d data '''
rbf = GPy.kern.RBF(2)
self.check_model(rbf, model_type='TPRegression', dimension=2)
def test_TPRegression_rbf_ARD_2D(self):
''' Testing the GP regression with rbf kernel on 2d data '''
k = GPy.kern.RBF(2, ARD=True)
self.check_model(k, model_type='TPRegression', dimension=2)
def test_TPRegression_matern52_2D(self):
''' Testing the TP regression with matern52 kernel on 2d data '''
matern52 = GPy.kern.Matern52(2)
self.check_model(matern52, model_type='TPRegression', dimension=2)
def test_TPRegression_matern52_ARD_2D(self):
''' Testing the TP regression with matern52 kernel on 2d data '''
matern52 = GPy.kern.Matern52(2, ARD=True)
self.check_model(matern52, model_type='TPRegression', dimension=2)
def test_TPRegression_matern32_1D(self):
''' Testing the TP regression with matern32 kernel on 1d data '''
matern32 = GPy.kern.Matern32(1)
self.check_model(matern32, model_type='TPRegression', dimension=1)
def test_TPRegression_matern32_2D(self):
''' Testing the TP regression with matern32 kernel on 2d data '''
matern32 = GPy.kern.Matern32(2)
self.check_model(matern32, model_type='TPRegression', dimension=2)
def test_TPRegression_matern32_ARD_2D(self):
''' Testing the TP regression with matern32 kernel on 2d data '''
matern32 = GPy.kern.Matern32(2, ARD=True)
self.check_model(matern32, model_type='TPRegression', dimension=2)
def test_GPLVM_rbf_bias_white_kern_2D(self):
""" Testing GPLVM with rbf + bias kernel """

145
GPy/testing/tp_tests.py Normal file
View file

@ -0,0 +1,145 @@
'''
Created on 14 Jul 2017, based on gp_tests
@author: javdrher
'''
import unittest
import numpy as np, GPy
class Test(unittest.TestCase):
def setUp(self):
np.random.seed(12345)
self.N = 20
self.N_new = 50
self.D = 1
self.X = np.random.uniform(-3., 3., (self.N, 1))
self.Y = np.sin(self.X) + np.random.randn(self.N, self.D) * 0.05
self.X_new = np.random.uniform(-3., 3., (self.N_new, 1))
def test_setxy_gp(self):
k = GPy.kern.RBF(1) + GPy.kern.White(1)
m = GPy.models.TPRegression(self.X, self.Y, kernel=k)
mu, var = m.predict(m.X)
X = m.X.copy()
m.set_XY(m.X[:10], m.Y[:10])
assert (m.checkgrad(tolerance=1e-2))
m.set_XY(X, self.Y)
mu2, var2 = m.predict(m.X)
np.testing.assert_allclose(mu, mu2)
np.testing.assert_allclose(var, var2)
def test_mean_function(self):
from GPy.core.parameterization.param import Param
from GPy.core.mapping import Mapping
class Parabola(Mapping):
def __init__(self, variance, degree=2, name='parabola'):
super(Parabola, self).__init__(1, 1, name)
self.variance = Param('variance', np.ones(degree + 1) * variance)
self.degree = degree
self.link_parameter(self.variance)
def f(self, X):
p = self.variance[0] * np.ones(X.shape)
for i in range(1, self.degree + 1):
p += self.variance[i] * X ** (i)
return p
def gradients_X(self, dL_dF, X):
grad = np.zeros(X.shape)
for i in range(1, self.degree + 1):
grad += (i) * self.variance[i] * X ** (i - 1)
return grad
def update_gradients(self, dL_dF, X):
for i in range(self.degree + 1):
self.variance.gradient[i] = (dL_dF * X ** (i)).sum(0)
X = np.linspace(-2, 2, 100)[:, None]
k = GPy.kern.RBF(1) + GPy.kern.White(1)
k.randomize()
p = Parabola(.3)
p.randomize()
Y = p.f(X) + np.random.multivariate_normal(np.zeros(X.shape[0]), k.K(X) + np.eye(X.shape[0]) * 1e-8)[:,
None] + np.random.normal(0, .1, (X.shape[0], 1))
m = GPy.models.TPRegression(X, Y, kernel=k, mean_function=p)
assert (m.checkgrad(tolerance=2e-1))
_ = m.predict(m.X)
def test_normalizer(self):
k = GPy.kern.RBF(1) + GPy.kern.White(1)
Y = self.Y
mu, std = Y.mean(0), Y.std(0)
m = GPy.models.TPRegression(self.X, Y, kernel=k, normalizer=True)
m.optimize()
assert (m.checkgrad())
k = GPy.kern.RBF(1) + GPy.kern.White(1)
m2 = GPy.models.TPRegression(self.X, (Y - mu) / std, kernel=k, normalizer=False)
m2[:] = m[:]
mu1, var1 = m.predict(m.X, full_cov=True)
mu2, var2 = m2.predict(m2.X, full_cov=True)
np.testing.assert_allclose(mu1, (mu2 * std) + mu)
np.testing.assert_allclose(var1, var2 * std ** 2)
mu1, var1 = m.predict(m.X, full_cov=False)
mu2, var2 = m2.predict(m2.X, full_cov=False)
np.testing.assert_allclose(mu1, (mu2 * std) + mu)
np.testing.assert_allclose(var1, var2 * std ** 2)
q50n = m.predict_quantiles(m.X, (50,))
q50 = m2.predict_quantiles(m2.X, (50,))
np.testing.assert_allclose(q50n[0], (q50[0] * std) + mu)
# Test variance component:
qs = np.array([2.5, 97.5])
# The quantiles get computed before unormalization
# And transformed using the mean transformation:
c = np.random.choice(self.X.shape[0])
q95 = m2.predict_quantiles(self.X[[c]], qs)
mu, var = m2.predict(self.X[[c]])
from scipy.stats import t
np.testing.assert_allclose((mu + (t.ppf(qs / 100., m2.nu + m2.num_data) * np.sqrt(var))).flatten(),
np.array(q95).flatten())
def test_predict_equivalence(self):
k = GPy.kern.RBF(1) + GPy.kern.White(1)
m = GPy.models.TPRegression(self.X, self.Y, kernel=k)
m.optimize()
mu1, var1 = m.predict(m.X)
mu2, var2 = m.predict_noiseless(m.X)
mu3, var3 = m._raw_predict(m.X)
np.testing.assert_allclose(mu1, mu2)
np.testing.assert_allclose(var1, var2)
np.testing.assert_allclose(mu1, mu3)
np.testing.assert_allclose(var1, var3)
m2 = GPy.models.TPRegression(self.X, self.Y, kernel=k, normalizer=True)
m2.optimize()
mu1, var1 = m2.predict(m.X)
mu2, var2 = m2.predict_noiseless(m.X)
mu3, var3 = m2._raw_predict(m.X)
np.testing.assert_allclose(mu1, mu2)
np.testing.assert_allclose(var1, var2)
self.assertFalse(np.allclose(mu1, mu3))
self.assertFalse(np.allclose(var1, var3))
def test_gp_equivalence(self):
k = GPy.kern.RBF(1)
m = GPy.models.GPRegression(self.X, self.Y, kernel=k)
m.optimize()
mu1, var1 = m.predict(self.X)
k1 = GPy.kern.RBF(1)
k1[:] = k[:]
k2 = GPy.kern.White(1, variance=m.likelihood.variance)
m2 = GPy.models.TPRegression(self.X, self.Y, kernel=k1 + k2, deg_free=1e6)
mu2, var2 = m2.predict(self.X)
np.testing.assert_allclose(mu1, mu2)
np.testing.assert_allclose(var1, var2)
if __name__ == "__main__":
unittest.main()