Enhance multioutput grad obs (#995)

* multiplied RBF kernels can now be used with gradient observations

* standard periodic kernels can now be used with gradient observations

* predictive gradients (derivatives of posterior means and variances) can now be calculated when using gradient observations

* simplified and commented RBF & StdP kernel derivatives

* updated kernel slicing and commented prod kernel derivatives

* removed caching from stdp kern, as it breaks optimization for some reason

* fixed hyperparameter optimization for prod kernel

* improved code readability

* added unit tests for gradient observing MultioutputGP models

* added predictions check to unit tests

* bugfix for multioutput_kern

* improved testing coverage

* reduced size of some tests; led to an issue in an unrelated test

* updated testing

* added gradient MultioutputGP prod kernel example

* added keywords and plotting to example
This commit is contained in:
mgranit 2023-09-21 16:53:42 +03:00 committed by GitHub
parent 2c22f1e9c5
commit 9c1db7aa34
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1494 additions and 164 deletions

View file

@ -9,6 +9,7 @@ from ..core.mapping import Mapping
from .. import likelihoods
from ..likelihoods.gaussian import Gaussian
from .. import kern
from ..kern import DiffKern
from ..inference.latent_function_inference import exact_gaussian_inference, expectation_propagation
from ..util.normalizer import Standardize
from .. import util
@ -69,39 +70,80 @@ class MultioutputGP(GP):
if Y_metadata is None:
Y_metadata={'output_index': ind, 'trials': np.ones(ind.shape)}
return super(MultioutputGP, self).predict_quantiles(X, quantiles, Y_metadata, kern, likelihood)
def predictive_gradients(self, Xnew, kern=None):
if isinstance(Xnew, list):
Xnew, _, ind = util.multioutput.build_XY(Xnew, None)
#if Y_metadata is None:
#Y_metadata={'output_index': ind}
return super(MultioutputGP, self).predictive_gradients(Xnew, kern)
def predictive_gradients(self, Xnew, kern=None): #XNEW IS NOT A LIST!!
def predictive_gradients(self, Xnew, kern=None):
"""
Compute the derivatives of the predicted latent function with respect to X*
Compute the derivatives of the predicted latent function with respect
to X*
Given a set of points at which to predict X* (size [N*,Q]), compute the
derivatives of the mean and variance. Resulting arrays are sized:
dmu_dX* -- [N*, Q ,D], where D is the number of output in this GP (usually one).
Note that this is not the same as computing the mean and variance of the derivative of the function!
dmu_dX* -- [N*, Q ,D], where D is the number of output in this GP
(usually one).
Note that this is not the same as computing the mean and variance of
the derivative of the function!
dv_dX* -- [N*, Q], (since all outputs have the same variance)
:param X: The points at which to get the predictive gradients
:type X: np.ndarray (Xnew x self.input_dim)
:returns: dmu_dX, dv_dX
:rtype: [np.ndarray (N*, Q ,D), np.ndarray (N*,Q) ]
"""
if isinstance(Xnew, list):
Xnew, _, ind = util.multioutput.build_XY(Xnew, None)
slices = index_to_slices(Xnew[:,-1])
for i in range(len(slices)):
if ((self.kern.kern[i].name == 'diffKern' ) and len(slices[i])>0):
assert 0, "It is not (yet) possible to predict gradients of gradient observations, sorry :)"
if kern is None:
kern = self.kern
if all([(isinstance(k, DiffKern)) for k in self.kern.kern[1:]]):
"""
Compute the gradients of the predicted latent function and predicted
partial derivatives with respect to X*.
This works only for models that observe the gradient of the latent function.
Xnew is given as a list of arrays, where each array X*_i (size [N_i*, Q])
contains points at which to compute gradients for each predicted latent
function or partial derivative.
Resulting arrays are sized [sum_i^D : N_i*, Q]
Passing a list of only one array [X*] returns only gradients of
the predicted latent function and does not compute gradients of
predicted partial derivatives.
In this case the resulting arrays are sized [N*, Q].
:param Xnew: points at which to compute predictive gradients
:type Xnew: list
:type Xnew[i]: np.darray (sum_i^D : N_i*, Q)
:returns: dmu_dX, dv_dX
:rtype: (np.ndarray (sum_i^D : N_i*, Q), np.ndarray (sum_i^D : N_i*, Q))
"""
dims = Xnew.shape[1] - 1
mean_jac = np.empty((Xnew.shape[0], dims))
var_jac = np.empty((Xnew.shape[0], dims))
X = self._predictive_variable
alpha = self.posterior.woodbury_vector
Wi = self.posterior.woodbury_inv
k = kern.K(Xnew, X)
for dimX in range(dims):
dk_dx = kern.dK_dX(Xnew, X, dimX)
dk_dxdiag = kern.dK_dXdiag(Xnew, dimX)
mean_jac[:,dimX] = np.dot(dk_dx, alpha).flatten()
var_jac[:,dimX] = dk_dxdiag - 2*(np.dot(k, Wi)*dk_dx).sum(-1)
return mean_jac, var_jac
mean_jac = np.empty((Xnew.shape[0],Xnew.shape[1]-1,self.output_dim))
for i in range(self.output_dim):
mean_jac[:,:,i] = kern.gradients_X(self.posterior.woodbury_vector[:,i:i+1].T, Xnew, self._predictive_variable)[:,0:-1]