diff --git a/GPy/testing/model_tests.py b/GPy/testing/model_tests.py index 2b9add02..83487373 100644 --- a/GPy/testing/model_tests.py +++ b/GPy/testing/model_tests.py @@ -332,7 +332,7 @@ class MiscTests(unittest.TestCase): np.testing.assert_almost_equal(np.exp(preds), warp_preds) - #@unittest.skip('Comment this to plot the modified sine function') + @unittest.skip('Comment this to plot the modified sine function') def test_warped_gp_sine(self): """ A test replicating the sine regression problem from @@ -341,13 +341,10 @@ class MiscTests(unittest.TestCase): X = (2 * np.pi) * np.random.random(151) - np.pi Y = np.sin(X) + np.random.normal(0,0.2,151) Y = np.array([np.power(abs(y),float(1)/3) * (1,-1)[y<0] for y in Y]) - #Y = np.abs(Y) import matplotlib.pyplot as plt warp_k = GPy.kern.RBF(1) warp_f = GPy.util.warping_functions.TanhFunction(n_terms=2) - #warp_f = GPy.util.warping_functions.LogisticFunction(n_terms=2) - #warp_f = GPy.util.warping_functions.LogitFunction(n_terms=1) warp_m = GPy.models.WarpedGP(X[:, None], Y[:, None], kernel=warp_k, warping_function=warp_f) m = GPy.models.GPRegression(X[:, None], Y[:, None]) diff --git a/GPy/util/warping_functions.py b/GPy/util/warping_functions.py index 548877e5..f98f0f08 100644 --- a/GPy/util/warping_functions.py +++ b/GPy/util/warping_functions.py @@ -234,284 +234,3 @@ class IdentityFunction(WarpingFunction): def f_inv(self, z, y=None): return z - -class LogisticFunction(WarpingFunction): - """ - A sum of logistic functions. - """ - def __init__(self, n_terms=3, initial_y=None): - """ - n_terms specifies the number of logistic terms to be used - """ - self.n_terms = n_terms - self.num_parameters = 3 * self.n_terms + 1 - self.psi = np.ones((self.n_terms, 3)) - super(LogisticFunction, self).__init__(name='warp_logistic') - self.psi = Param('psi', self.psi) - self.psi[:, :2].constrain_positive() - self.d = Param('%s' % ('d'), 1.0, Logexp()) - self.link_parameter(self.psi) - self.link_parameter(self.d) - self.initial_y = initial_y - - def _logistic(self, x): - return 1. / (1 + np.exp(-x)) - - def f(self, y): - """ - Transform y with f using parameter vector psi - psi = [[a,b,c]] - - :math:`f = (y * d) + \\sum_{terms} a * logistic(b *(y + c))` - """ - d = self.d - mpsi = self.psi - z = d * y.copy() - for i in xrange(self.n_terms): - a, b, c = mpsi[i] - z += a * self._logistic(b * (y + c)) - return z - - def f_inv(self, z, max_iterations=100, y=None): - """ - calculate the numerical inverse of f - - :param max_iterations: maximum number of N.R. iterations - """ - z = z.copy() - if y is None: - # The idea here is to initialize y with +1 where - # z is positive and -1 where it is negative. - # For negative z, Newton-Raphson diverges - # if we initialize y with a positive value (and vice-versa). - y = ((z > 0) * 1.) - (z <= 0) - if self.initial_y is not None: - y *= self.initial_y - it = 0 - update = np.inf - while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations): - fy = self.f(y) - fgrady = self.fgrad_y(y) - update = (fy - z) / fgrady - y -= update - it += 1 - if it == max_iterations: - print("WARNING!!! Maximum number of iterations reached in f_inv ") - print("Sum of updates: %.4f" % np.sum(update)) - return y - - def fgrad_y(self, y, return_precalc=False): - """ - Gradient of f w.r.t to y ([N x 1]) - This vectorized version calculates all summation terms - at the same time (since the grad of a sum is the sum of the grads). - - :returns: Nx1 vector of derivatives, unless return_precalc is true, - then it also returns the precomputed stuff - """ - d = self.d - mpsi = self.psi - - # vectorized version - # term = b * (y + c) - term = (mpsi[:, 1] * (y[:, :, None] + mpsi[:, 2])).T - logistic_term = self._logistic(term) - logistic_grad = logistic_term - (logistic_term ** 2) - - #S = (mpsi[:,1] * (y[:,:,None] + mpsi[:,2])).T - #R = np.tanh(S) - #D = 1 - (R ** 2) - - #GRAD = (d + (mpsi[:,0:1][:,:,None] * mpsi[:,1:2][:,:,None] * D).sum(axis=0)).T - grad = (d + (mpsi[:, :1][:, : ,None] * mpsi[:, 1:2][:, :, None] * - logistic_grad).sum(axis=0)).T - - if return_precalc: - return grad, logistic_term, logistic_grad - return grad - - def fgrad_y_psi(self, y, return_covar_chain=False): - """ - gradient of f w.r.t to y and psi - - :returns: NxIx4 tensor of partial derivatives - """ - mpsi = self.psi - - grad, l_term, l_grad = self.fgrad_y(y, return_precalc=True) - gradients = np.zeros((y.shape[0], y.shape[1], self.n_terms, 4)) - for i in xrange(self.n_terms): - a, b, c = mpsi[i] - gradients[:, :, i, 0] = b * l_grad[i].T - b2l_term = b - (2 * l_term[i].T) - al_grad = a * l_grad[i].T - #gradients[:, :, i, 1] = a * (d[i] - 2.0 * s[i] * r[i] * (1.0/np.cosh(s[i])) ** 2).T - gradients[:, :, i, 1] = (1 + ((y + c) * b2l_term)) * al_grad - #gradients[:, :, i, 2] = (-2.0 * a * (b ** 2) * r[i] * ((1.0 / np.cosh(s[i])) ** 2)).T - gradients[:, :, i, 2] = b * b2l_term * al_grad - gradients[:, :, 0, 3] = 1.0 - - if return_covar_chain: - covar_grad_chain = np.zeros((y.shape[0], y.shape[1], len(mpsi), 4)) - for i in xrange(self.n_terms): - a, b, c = mpsi[i] - covar_grad_chain[:, :, i, 0] = l_term[i].T - al_grad = a * l_grad[i].T - covar_grad_chain[:, :, i, 1] = (y + c) * al_grad - covar_grad_chain[:, :, i, 2] = b * al_grad - covar_grad_chain[:, :, 0, 3] = y - return gradients, covar_grad_chain - - return gradients - - def _get_param_names(self): - variables = ['a', 'b', 'c', 'd'] - names = sum([['warp_logistic_%s_t%i' % (variables[n],q) for n in range(3)] - for q in range(self.n_terms)],[]) - names.append('warp_logistic') - return names - - def update_grads(self, Y_untransformed, Kiy): - grad_y = self.fgrad_y(Y_untransformed) - grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed, - return_covar_chain=True) - djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0) - dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0) - - warping_grads = -dquad_dpsi + djac_dpsi - - self.psi.gradient[:] = warping_grads[:, :-1] - self.d.gradient[:] = warping_grads[0, -1] - - -class LogitFunction(WarpingFunction): - """ - A sum of logit functions. - """ - def __init__(self, n_terms=1, initial_y=None): - """ - n_terms specifies the number of logistic terms to be used - """ - self.n_terms = n_terms - self.num_parameters = 3 * self.n_terms - self.psi = np.ones((self.n_terms, 3)) - super(LogitFunction, self).__init__(name='warp_logit') - self.psi = Param('psi', self.psi) - self.psi[:, :2].constrain_positive() - self.link_parameter(self.psi) - self.initial_y = initial_y - self.e = 1e-5 - - def _logit(self, y): - a, b, c = self.psi[0] - y += self.e - return ((np.log(y) - np.log(a - y)) / b) + c - - def f(self, y): - """ - Transform y with f using parameter vector psi - psi = [[a,b,c]] - - :math:`f = (y * d) + \\sum_{terms} a * logistic(b *(y + c))` - """ - z = np.zeros_like(y) - for i in xrange(self.n_terms): - a, b, c = self.psi[i] - z += self._logit(y) - return z - - def f_inv(self, z, max_iterations=100, y=None): - """ - calculate the numerical inverse of f - - :param max_iterations: maximum number of N.R. iterations - """ - z = z.copy() - if y is None: - # The idea here is to initialize y with +1 where - # z is positive and -1 where it is negative. - # For negative z, Newton-Raphson diverges - # if we initialize y with a positive value (and vice-versa). - y = ((z > 0) * 1.) - (z <= 0) - if self.initial_y is not None: - y *= self.initial_y - it = 0 - update = np.inf - while it == 0 or (np.abs(update).sum() > 1e-10 and it < max_iterations): - fy = self.f(y) - fgrady = self.fgrad_y(y) - update = (fy - z) / fgrady - y -= update - it += 1 - if it == max_iterations: - print("WARNING!!! Maximum number of iterations reached in f_inv ") - print("Sum of updates: %.4f" % np.sum(update)) - return y - - def fgrad_y(self, y, return_precalc=False): - """ - Gradient of f w.r.t to y ([N x 1]) - This vectorized version calculates all summation terms - at the same time (since the grad of a sum is the sum of the grads). - - :returns: Nx1 vector of derivatives, unless return_precalc is true, - then it also returns the precomputed stuff - """ - a, b, c = self.psi[0] - - # vectorized version - # term = b * (y + c) - y += self.e - yb = y * b - ab = a * b - grad = (1. / yb) + (1. / (ab - yb)) - - if return_precalc: - return grad, yb, ab - return grad - - def fgrad_y_psi(self, y, return_covar_chain=False): - """ - gradient of f w.r.t to y and psi - - :returns: NxIx4 tensor of partial derivatives - """ - grad, yb, ab = self.fgrad_y(y, return_precalc=True) - gradients = np.zeros((y.shape[0], y.shape[1], self.n_terms, 3)) - for i in xrange(self.n_terms): - a, b, c = self.psi[i] - gradients[:, :, i, 0] = -b / ((ab - yb) ** 2) - yb2 = y * (b ** 2) - ab2 = a * (b ** 2) - gradients[:, :, i, 1] = - (1. / yb2) - (1. / (ab2 - yb2)) - #gradients[:, :, i, 2] = 0.0 - - if return_covar_chain: - covar_grad_chain = np.zeros((y.shape[0], y.shape[1], self.n_terms, 3)) - for i in xrange(self.n_terms): - a, b, c = self.psi[i] - covar_grad_chain[:, :, i, 0] = - (1. / (ab - yb)) - covar_grad_chain[:, :, i, 1] = - (np.log(y) - np.log(a - y)) / (b ** 2) - covar_grad_chain[:, :, i, 2] = 1.0 - return gradients, covar_grad_chain - - return gradients - - def _get_param_names(self): - variables = ['a', 'b', 'c'] - names = sum([['warp_logit_%s_t%i' % (variables[n],q) for n in range(3)] - for q in range(self.n_terms)],[]) - names.append('warp_logit') - return names - - def update_grads(self, Y_untransformed, Kiy): - grad_y = self.fgrad_y(Y_untransformed) - grad_y_psi, grad_psi = self.fgrad_y_psi(Y_untransformed, - return_covar_chain=True) - djac_dpsi = ((1.0 / grad_y[:, :, None, None]) * grad_y_psi).sum(axis=0).sum(axis=0) - dquad_dpsi = (Kiy[:, None, None, None] * grad_psi).sum(axis=0).sum(axis=0) - - warping_grads = -dquad_dpsi + djac_dpsi - - self.psi.gradient[:] = warping_grads[:, :] -