mirror of
https://github.com/SheffieldML/GPy.git
synced 2026-05-07 02:52:40 +02:00
204 lines
9.8 KiB
Python
204 lines
9.8 KiB
Python
# Copyright (c) 2012, James Hesnsman
|
|
# Licensed under the BSD 3-clause license (see LICENSE.txt)
|
|
|
|
|
|
from .kern import Kern, CombinationKernel
|
|
import numpy as np
|
|
import itertools
|
|
|
|
def index_to_slices(index):
|
|
"""
|
|
take a numpy array of integers (index) and return a nested list of slices such that the slices describe the start, stop points for each integer in the index.
|
|
|
|
e.g.
|
|
>>> index = np.asarray([0,0,0,1,1,1,2,2,2])
|
|
returns
|
|
>>> [[slice(0,3,None)],[slice(3,6,None)],[slice(6,9,None)]]
|
|
|
|
or, a more complicated example
|
|
>>> index = np.asarray([0,0,1,1,0,2,2,2,1,1])
|
|
returns
|
|
>>> [[slice(0,2,None),slice(4,5,None)],[slice(2,4,None),slice(8,10,None)],[slice(5,8,None)]]
|
|
"""
|
|
if len(index)==0:
|
|
return[]
|
|
|
|
#contruct the return structure
|
|
ind = np.asarray(index,dtype=np.int)
|
|
ret = [[] for i in range(ind.max()+1)]
|
|
|
|
#find the switchpoints
|
|
ind_ = np.hstack((ind,ind[0]+ind[-1]+1))
|
|
switchpoints = np.nonzero(ind_ - np.roll(ind_,+1))[0]
|
|
|
|
[ret[ind_i].append(slice(*indexes_i)) for ind_i,indexes_i in zip(ind[switchpoints[:-1]],zip(switchpoints,switchpoints[1:]))]
|
|
return ret
|
|
|
|
class IndependentOutputs(CombinationKernel):
|
|
"""
|
|
A kernel which can represent several independent functions. this kernel
|
|
'switches off' parts of the matrix where the output indexes are different.
|
|
|
|
The index of the functions is given by the last column in the input X the
|
|
rest of the columns of X are passed to the underlying kernel for
|
|
computation (in blocks).
|
|
|
|
:param kernels: either a kernel, or list of kernels to work with. If it is
|
|
a list of kernels the indices in the index_dim, index the kernels you gave!
|
|
"""
|
|
def __init__(self, kernels, index_dim=-1, name='independ'):
|
|
assert isinstance(index_dim, int), "IndependentOutputs kernel is only defined with one input dimension being the index"
|
|
if not isinstance(kernels, list):
|
|
self.single_kern = True
|
|
self.kern = kernels
|
|
kernels = [kernels]
|
|
else:
|
|
self.single_kern = False
|
|
self.kern = kernels
|
|
super(IndependentOutputs, self).__init__(kernels=kernels, extra_dims=[index_dim], name=name)
|
|
self.index_dim = index_dim
|
|
|
|
def K(self,X ,X2=None):
|
|
slices = index_to_slices(X[:,self.index_dim])
|
|
kerns = itertools.repeat(self.kern) if self.single_kern else self.kern
|
|
if X2 is None:
|
|
target = np.zeros((X.shape[0], X.shape[0]))
|
|
[[target.__setitem__((s,ss), kern.K(X[s,:], X[ss,:])) for s,ss in itertools.product(slices_i, slices_i)] for kern, slices_i in zip(kerns, slices)]
|
|
else:
|
|
slices2 = index_to_slices(X2[:,self.index_dim])
|
|
target = np.zeros((X.shape[0], X2.shape[0]))
|
|
[[target.__setitem__((s,s2), kern.K(X[s,:],X2[s2,:])) for s,s2 in itertools.product(slices_i, slices_j)] for kern, slices_i,slices_j in zip(kerns, slices,slices2)]
|
|
return target
|
|
|
|
def Kdiag(self,X):
|
|
slices = index_to_slices(X[:,self.index_dim])
|
|
kerns = itertools.repeat(self.kern) if self.single_kern else self.kern
|
|
target = np.zeros(X.shape[0])
|
|
[[np.copyto(target[s], kern.Kdiag(X[s])) for s in slices_i] for kern, slices_i in zip(kerns, slices)]
|
|
return target
|
|
|
|
def update_gradients_full(self,dL_dK,X,X2=None):
|
|
slices = index_to_slices(X[:,self.index_dim])
|
|
if self.single_kern:
|
|
target = np.zeros(self.kern.size)
|
|
kerns = itertools.repeat(self.kern)
|
|
else:
|
|
kerns = self.kern
|
|
target = [np.zeros(kern.size) for kern, _ in zip(kerns, slices)]
|
|
def collate_grads(kern, i, dL, X, X2):
|
|
kern.update_gradients_full(dL,X,X2)
|
|
if self.single_kern: target[:] += kern.gradient
|
|
else: target[i][:] += kern.gradient
|
|
if X2 is None:
|
|
[[collate_grads(kern, i, dL_dK[s,ss], X[s], X[ss]) for s,ss in itertools.product(slices_i, slices_i)] for i,(kern,slices_i) in enumerate(zip(kerns,slices))]
|
|
else:
|
|
slices2 = index_to_slices(X2[:,self.index_dim])
|
|
[[[collate_grads(kern, i, dL_dK[s,s2],X[s],X2[s2]) for s in slices_i] for s2 in slices_j] for i,(kern,slices_i,slices_j) in enumerate(zip(kerns,slices,slices2))]
|
|
|
|
if self.single_kern:
|
|
kern.gradient = target
|
|
else:[kern.gradient.__setitem__(Ellipsis, target[i]) for i, [kern, _] in enumerate(zip(kerns, slices))]
|
|
|
|
def gradients_X(self,dL_dK, X, X2=None):
|
|
target = np.zeros(X.shape)
|
|
kerns = itertools.repeat(self.kern) if self.single_kern else self.kern
|
|
if X2 is None:
|
|
# TODO: make use of index_to_slices
|
|
values = np.unique(X[:,self.index_dim])
|
|
slices = [X[:,self.index_dim]==i for i in values]
|
|
[target.__setitem__(s, kern.gradients_X(dL_dK[s,s],X[s],None))
|
|
for kern, s in zip(kerns, slices)]
|
|
#slices = index_to_slices(X[:,self.index_dim])
|
|
#[[np.add(target[s], kern.gradients_X(dL_dK[s,s], X[s]), out=target[s])
|
|
# for s in slices_i] for kern, slices_i in zip(kerns, slices)]
|
|
#import ipdb;ipdb.set_trace()
|
|
#[[(np.add(target[s ], kern.gradients_X(dL_dK[s ,ss],X[s ], X[ss]), out=target[s ]),
|
|
# np.add(target[ss], kern.gradients_X(dL_dK[ss,s ],X[ss], X[s ]), out=target[ss]))
|
|
# for s, ss in itertools.combinations(slices_i, 2)] for kern, slices_i in zip(kerns, slices)]
|
|
else:
|
|
values = np.unique(X[:,self.index_dim])
|
|
slices = [X[:,self.index_dim]==i for i in values]
|
|
slices2 = [X2[:,self.index_dim]==i for i in values]
|
|
[target.__setitem__(s, kern.gradients_X(dL_dK[s, :][:, s2],X[s],X2[s2]))
|
|
for kern, s, s2 in zip(kerns, slices, slices2)]
|
|
# TODO: make work with index_to_slices
|
|
#slices = index_to_slices(X[:,self.index_dim])
|
|
#slices2 = index_to_slices(X2[:,self.index_dim])
|
|
#[[target.__setitem__(s, target[s] + kern.gradients_X(dL_dK[s,s2], X[s], X2[s2])) for s, s2 in itertools.product(slices_i, slices_j)] for kern, slices_i,slices_j in zip(kerns, slices,slices2)]
|
|
return target
|
|
|
|
def gradients_X_diag(self, dL_dKdiag, X):
|
|
slices = index_to_slices(X[:,self.index_dim])
|
|
kerns = itertools.repeat(self.kern) if self.single_kern else self.kern
|
|
target = np.zeros(X.shape)
|
|
[[target.__setitem__(s, kern.gradients_X_diag(dL_dKdiag[s],X[s])) for s in slices_i] for kern, slices_i in zip(kerns, slices)]
|
|
return target
|
|
|
|
def update_gradients_diag(self, dL_dKdiag, X):
|
|
slices = index_to_slices(X[:,self.index_dim])
|
|
kerns = itertools.repeat(self.kern) if self.single_kern else self.kern
|
|
if self.single_kern: target = np.zeros(self.kern.size)
|
|
else: target = [np.zeros(kern.size) for kern, _ in zip(kerns, slices)]
|
|
def collate_grads(kern, i, dL, X):
|
|
kern.update_gradients_diag(dL,X)
|
|
if self.single_kern: target[:] += kern.gradient
|
|
else: target[i][:] += kern.gradient
|
|
[[collate_grads(kern, i, dL_dKdiag[s], X[s,:]) for s in slices_i] for i, (kern, slices_i) in enumerate(zip(kerns, slices))]
|
|
if self.single_kern: kern.gradient = target
|
|
else:[kern.gradient.__setitem__(Ellipsis, target[i]) for i, [kern, _] in enumerate(zip(kerns, slices))]
|
|
|
|
class Hierarchical(CombinationKernel):
|
|
"""
|
|
A kernel which can represent a simple hierarchical model.
|
|
|
|
See Hensman et al 2013, "Hierarchical Bayesian modelling of gene expression time
|
|
series across irregularly sampled replicates and clusters"
|
|
http://www.biomedcentral.com/1471-2105/14/252
|
|
|
|
To construct this kernel, you must pass a list of kernels. the first kernel
|
|
will be assumed to be the 'base' kernel, and will be computed everywhere.
|
|
For every additional kernel, we assume another layer in the hierachy, with
|
|
a corresponding column of the input matrix which indexes which function the
|
|
data are in at that level.
|
|
|
|
For more, see the ipython notebook documentation on Hierarchical
|
|
covariances.
|
|
"""
|
|
def __init__(self, kernels, name='hierarchy'):
|
|
assert all([k.input_dim==kernels[0].input_dim for k in kernels])
|
|
assert len(kernels) > 1
|
|
self.levels = len(kernels) -1
|
|
input_max = max([k.input_dim for k in kernels])
|
|
super(Hierarchical, self).__init__(kernels=kernels, extra_dims = range(input_max, input_max + len(kernels)-1), name=name)
|
|
|
|
def K(self,X ,X2=None):
|
|
K = self.parts[0].K(X, X2) # compute 'base' kern everywhere
|
|
slices = [index_to_slices(X[:,i]) for i in self.extra_dims]
|
|
if X2 is None:
|
|
[[[np.add(K[s,s], k.K(X[s], None), K[s, s]) for s in slices_i] for slices_i in slices_k] for k, slices_k in zip(self.parts[1:], slices)]
|
|
else:
|
|
slices2 = [index_to_slices(X2[:,i]) for i in self.extra_dims]
|
|
[[[np.add(K[s,ss], k.K(X[s], X2[ss]), K[s, ss]) for s,ss in zip(slices_i, slices_j)] for slices_i, slices_j in zip(slices_k1, slices_k2)] for k, slices_k1, slices_k2 in zip(self.parts[1:], slices, slices2)]
|
|
return K
|
|
|
|
def Kdiag(self,X):
|
|
return np.diag(self.K(X))
|
|
|
|
def gradients_X(self, dL_dK, X, X2=None):
|
|
raise NotImplementedError
|
|
|
|
def update_gradients_full(self,dL_dK,X,X2=None):
|
|
slices = [index_to_slices(X[:,i]) for i in self.extra_dims]
|
|
if X2 is None:
|
|
self.parts[0].update_gradients_full(dL_dK, X, None)
|
|
for k, slices_k in zip(self.parts[1:], slices):
|
|
target = np.zeros(k.size)
|
|
def collate_grads(dL, X, X2, target):
|
|
k.update_gradients_full(dL,X,X2)
|
|
target += k.gradient
|
|
[[collate_grads(dL_dK[s,s], X[s], None, target) for s in slices_i] for slices_i in slices_k]
|
|
k.gradient[:] = target
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
|