kernel slices allowed

This commit is contained in:
Max Zwiessele 2014-03-14 10:55:16 +00:00
parent b5811011e0
commit 600b1bde3c
3 changed files with 29 additions and 18 deletions

View file

@ -2,7 +2,7 @@
# Licensed under the BSD 3-clause license (see LICENSE.txt)
from kern import Kern
from kern import Kern, CombinationKernel
import numpy as np
import itertools
@ -32,7 +32,7 @@ def index_to_slices(index):
[ret[ind_i].append(slice(*indexes_i)) for ind_i,indexes_i in zip(ind[switchpoints[:-1]],zip(switchpoints,switchpoints[1:]))]
return ret
class IndependentOutputs(Kern):
class IndependentOutputs(CombinationKernel):
"""
A kernel which can represent several independent functions.
this kernel 'switches off' parts of the matrix where the output indexes are different.
@ -41,12 +41,12 @@ class IndependentOutputs(Kern):
the rest of the columns of X are passed to the underlying kernel for computation (in blocks).
"""
def __init__(self, index_dim, kern, name='independ'):
def __init__(self, kern, index_dim=-1, name='independ'):
assert isinstance(index_dim, int), "IndependentOutputs kernel is only defined with one input dimension being the indeces"
super(IndependentOutputs, self).__init__(np.r_[0:max(max(kern.active_dims)+1, index_dim+1)], name)
super(IndependentOutputs, self).__init__(kernels=[kern], extra_dims=[index_dim], name=name)
self.index_dim = index_dim
self.kern = kern
self.add_parameters(self.kern)
#self.add_parameters(self.kern)
def K(self,X ,X2=None):
slices = index_to_slices(X[:,self.index_dim])

View file

@ -27,10 +27,18 @@ class Kern(Parameterized):
Do not instantiate.
"""
super(Kern, self).__init__(name=name, *a, **kw)
self.active_dims = active_dims or slice(0, input_dim)
self.active_dims = active_dims if active_dims is not None else slice(0, input_dim)
self.input_dim = input_dim
assert isinstance(self.active_dims, (slice, list, tuple, np.ndarray)), 'active_dims needs to be an array-like or slice object over dimensions, {} given'.format(self.active_dims.__class__)
assert self.active_dims.size == self.input_dim, "input_dim {} does not match len(active_dim) {}".format(self.input_dim, self.active_dims.size)
if isinstance(self.active_dims, slice):
self.active_dims = slice(self.active_dims.start or 0, self.active_dims.stop or self.input_dim, self.active_dims.step or 1)
active_dim_size = int(np.round((self.active_dims.stop-self.active_dims.start)/self.active_dims.step))
elif isinstance(self.active_dims, np.ndarray):
assert self.active_dims.ndim == 1, 'only flat indices allowed, given active_dims.shape={}, provide only indexes to the dimensions of the input'.format(self.active_dims.shape)
active_dim_size = self.active_dims.size
else:
active_dim_size = len(self.active_dims)
assert active_dim_size == self.input_dim, "input_dim={} does not match len(active_dim)={}, active_dims={}".format(self.input_dim, active_dim_size, self.active_dims)
self._sliced_X = 0
@Cache_this(limit=10)
@ -207,10 +215,12 @@ class CombinationKernel(Kern):
assert all([isinstance(k, Kern) for k in kernels])
import itertools
# make sure the active dimensions of all underlying kernels are covered:
ma = reduce(lambda a,b: max(a, b.stop if isinstance(b, slice) else max(b)), itertools.chain((x.active_dims for x in kernels), [extra_dims]), 0)
input_dim = np.r_[0:ma+1]
#ma = reduce(lambda a,b: max(a, b.stop if isinstance(b, slice) else max(b)), itertools.chain((x.active_dims for x in kernels)), 0)
active_dims = reduce(np.union1d, (np.r_[x.active_dims] for x in kernels), np.array([], dtype=int))
input_dim = active_dims.max()+1 + len(extra_dims)
active_dims = slice(active_dims.max()+1+len(extra_dims))
# initialize the kernel with the full input_dim
super(CombinationKernel, self).__init__(input_dim, name)
super(CombinationKernel, self).__init__(input_dim, active_dims, name)
self.extra_dims = extra_dims
self.add_parameters(*kernels)

View file

@ -228,12 +228,12 @@ class KernelGradientTestsContinuous(unittest.TestCase):
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_Prod(self):
k = GPy.kern.Matern32([2,3]) * GPy.kern.RBF([0,4]) + GPy.kern.Linear(self.D)
k = GPy.kern.Matern32(2, active_dims=[2,3]) * GPy.kern.RBF(2, active_dims=[0,4]) + GPy.kern.Linear(self.D)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
def test_Add(self):
k = GPy.kern.Matern32([2,3]) + GPy.kern.RBF([0,4]) + GPy.kern.Linear(self.D)
k = GPy.kern.Matern32(2, active_dims=[2,3]) + GPy.kern.RBF(2, active_dims=[0,4]) + GPy.kern.Linear(self.D)
k.randomize()
self.assertTrue(check_kernel_gradient_functions(k, X=self.X, X2=self.X2, verbose=verbose))
@ -283,15 +283,16 @@ class KernelTestsMiscellaneous(unittest.TestCase):
def setUp(self):
N, D = 100, 10
self.X = np.linspace(-np.pi, +np.pi, N)[:,None] * np.ones(D)
self.rbf = GPy.kern.RBF(range(2))
self.linear = GPy.kern.Linear((3,6))
self.matern = GPy.kern.Matern32(np.array([2,4,7]))
self.rbf = GPy.kern.RBF(2, active_dims=slice(0,4,2))
self.linear = GPy.kern.Linear(2, active_dims=(3,9))
self.matern = GPy.kern.Matern32(3, active_dims=np.array([2,4,9]))
self.sumkern = self.rbf + self.linear
self.sumkern += self.matern
self.sumkern.randomize()
def test_active_dims(self):
self.assertListEqual(self.sumkern.active_dims.tolist(), range(8))
self.assertEqual(self.sumkern.input_dim, 9)
self.assertEqual(self.sumkern.active_dims, slice(9))
def test_which_parts(self):
self.assertTrue(np.allclose(self.sumkern.K(self.X, which_parts=[self.linear, self.matern]), self.linear.K(self.X)+self.matern.K(self.X)))
@ -312,9 +313,9 @@ class KernelTestsNonContinuous(unittest.TestCase):
self.X_block[0:N, -1] = 1
self.X_block[N:N+1, -1] = 2
def test_IndependantOutputs(self):
def test_IndependentOutputs(self):
k = GPy.kern.RBF(self.D)
kern = GPy.kern.IndependentOutputs(self.D+self.D,k)
kern = GPy.kern.IndependentOutputs(k, -1)
self.assertTrue(check_kernel_gradient_functions(kern, X=self.X, X2=self.X2, verbose=verbose))
if __name__ == "__main__":