the discriminative prior

This commit is contained in:
Fariba 2014-10-16 17:18:57 +01:00
parent 33fcd06ccc
commit 66755099cd

View file

@ -9,6 +9,7 @@ from domains import _REAL, _POSITIVE
import warnings
import weakref
class Prior(object):
domain = None
@ -17,13 +18,16 @@ class Prior(object):
def plot(self):
import sys
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
from ...plotting.matplot_dep import priors_plots
priors_plots.univariate_plot(self)
def __repr__(self, *args, **kwargs):
return self.__str__()
class Gaussian(Prior):
"""
Implementation of the univariate Gaussian probability function, coupled with random variables.
@ -36,6 +40,7 @@ class Gaussian(Prior):
"""
domain = _REAL
_instances = []
def __new__(cls, mu, sigma): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -45,6 +50,7 @@ class Gaussian(Prior):
o = super(Prior, cls).__new__(cls, mu, sigma)
cls._instances.append(weakref.ref(o))
return cls._instances[-1]()
def __init__(self, mu, sigma):
self.mu = float(mu)
self.sigma = float(sigma)
@ -67,6 +73,7 @@ class Gaussian(Prior):
class Uniform(Prior):
domain = _REAL
_instances = []
def __new__(cls, lower, upper): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -94,6 +101,7 @@ class Uniform(Prior):
def rvs(self, n):
return np.random.uniform(self.lower, self.upper, size=n)
class LogGaussian(Prior):
"""
Implementation of the univariate *log*-Gaussian probability function, coupled with random variables.
@ -106,6 +114,7 @@ class LogGaussian(Prior):
"""
domain = _POSITIVE
_instances = []
def __new__(cls, mu, sigma): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -115,6 +124,7 @@ class LogGaussian(Prior):
o = super(Prior, cls).__new__(cls, mu, sigma)
cls._instances.append(weakref.ref(o))
return cls._instances[-1]()
def __init__(self, mu, sigma):
self.mu = float(mu)
self.sigma = float(sigma)
@ -146,6 +156,7 @@ class MultivariateGaussian:
"""
domain = _REAL
_instances = []
def __new__(cls, mu, var): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -155,6 +166,7 @@ class MultivariateGaussian:
o = super(Prior, cls).__new__(cls, mu, var)
cls._instances.append(weakref.ref(o))
return cls._instances[-1]()
def __init__(self, mu, var):
self.mu = np.array(mu).flatten()
self.var = np.array(var)
@ -184,10 +196,13 @@ class MultivariateGaussian:
def plot(self):
import sys
assert "matplotlib" in sys.modules, "matplotlib package has not been imported."
from ..plotting.matplot_dep import priors_plots
priors_plots.multivariate_plot(self)
def gamma_from_EV(E, V):
warnings.warn("use Gamma.from_EV to create Gamma Prior", FutureWarning)
return Gamma.from_EV(E, V)
@ -205,6 +220,7 @@ class Gamma(Prior):
"""
domain = _POSITIVE
_instances = []
def __new__(cls, a, b): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -214,6 +230,7 @@ class Gamma(Prior):
o = super(Prior, cls).__new__(cls, a, b)
cls._instances.append(weakref.ref(o))
return cls._instances[-1]()
def __init__(self, a, b):
self.a = float(a)
self.b = float(b)
@ -241,6 +258,7 @@ class Gamma(Prior):
def rvs(self, n):
return np.random.gamma(scale=1. / self.b, shape=self.a, size=n)
@staticmethod
def from_EV(E, V):
"""
@ -254,6 +272,7 @@ class Gamma(Prior):
b = E / V
return Gamma(a, b)
class inverse_gamma(Prior):
"""
Implementation of the inverse-Gamma probability function, coupled with random variables.
@ -265,6 +284,7 @@ class inverse_gamma(Prior):
"""
domain = _POSITIVE
def __new__(cls, a, b): # Singleton:
if cls._instances:
cls._instances[:] = [instance for instance in cls._instances if instance()]
@ -274,6 +294,7 @@ class inverse_gamma(Prior):
o = super(Prior, cls).__new__(cls, a, b)
cls._instances.append(weakref.ref(o))
return cls._instances[-1]()
def __init__(self, a, b):
self.a = float(a)
self.b = float(b)
@ -290,3 +311,348 @@ class inverse_gamma(Prior):
def rvs(self, n):
return 1. / np.random.gamma(scale=1. / self.b, shape=self.a, size=n)
class DGPLVM_KFDA(Prior):
"""
Implementation of the Discriminative Gaussian Process Latent Variable function using
Kernel Fisher Discriminant Analysis by Seung-Jean Kim for implementing Face paper
by Chaochao Lu.
:param lambdaa: constant
:param sigma2: constant
.. Note:: Surpassing Human-Level Face paper dgplvm implementation
"""
domain = _REAL
# _instances = []
# def __new__(cls, lambdaa, sigma2): # Singleton:
# if cls._instances:
# cls._instances[:] = [instance for instance in cls._instances if instance()]
# for instance in cls._instances:
# if instance().mu == mu and instance().sigma == sigma:
# return instance()
# o = super(Prior, cls).__new__(cls, mu, sigma)
# cls._instances.append(weakref.ref(o))
# return cls._instances[-1]()
def __init__(self, lambdaa, sigma2, lbl, kern, x_shape):
"""A description for init"""
self.datanum = lbl.shape[0]
self.classnum = lbl.shape[1]
self.lambdaa = lambdaa
self.sigma2 = sigma2
self.lbl = lbl
self.kern = kern
lst_ni = self.compute_lst_ni()
self.a = self.compute_a(lst_ni)
self.A = self.compute_A(lst_ni)
self.x_shape = x_shape
def get_class_label(self, y):
for idx, v in enumerate(y):
if v == 1:
return idx
return -1
# This function assigns each data point to its own class
# and returns the dictionary which contains the class name and parameters.
def compute_cls(self, x):
cls = {}
# Appending each data point to its proper class
for j in xrange(self.datanum):
class_label = self.get_class_label(self.lbl[j])
if class_label not in cls:
cls[class_label] = []
cls[class_label].append(x[j])
if len(cls) > 2:
for i in range(2, self.classnum):
del cls[i]
return cls
def x_reduced(self, cls):
x1 = cls[0]
x2 = cls[1]
x = np.concatenate((x1, x2), axis=0)
return x
def compute_lst_ni(self):
lst_ni = []
lst_ni1 = []
lst_ni2 = []
f1 = (np.where(self.lbl[:, 0] == 1)[0])
f2 = (np.where(self.lbl[:, 1] == 1)[0])
for idx in f1:
lst_ni1.append(idx)
for idx in f2:
lst_ni2.append(idx)
lst_ni.append(len(lst_ni1))
lst_ni.append(len(lst_ni2))
return lst_ni
def compute_a(self, lst_ni):
a = np.ones((self.datanum, 1))
count = 0
for N_i in lst_ni:
if N_i == lst_ni[0]:
a[count:count + N_i] = (float(1) / N_i) * a[count]
count += N_i
else:
if N_i == lst_ni[1]:
a[count: count + N_i] = -(float(1) / N_i) * a[count]
count += N_i
return a
def compute_A(self, lst_ni):
A = np.zeros((self.datanum, self.datanum))
idx = 0
for N_i in lst_ni:
B = float(1) / np.sqrt(N_i) * (np.eye(N_i) - ((float(1) / N_i) * np.ones((N_i, N_i))))
A[idx:idx + N_i, idx:idx + N_i] = B
idx += N_i
return A
# Here log function
def lnpdf(self, x):
x = x.reshape(self.x_shape)
K = self.kern.K(x)
a_trans = np.transpose(self.a)
paran = self.lambdaa * np.eye(x.shape[0]) + self.A.dot(K).dot(self.A)
inv_part = pdinv(paran)[0]
J = a_trans.dot(K).dot(self.a) - a_trans.dot(K).dot(self.A).dot(inv_part).dot(self.A).dot(K).dot(self.a)
J_star = (1. / self.lambdaa) * J
return (-1. / self.sigma2) * J_star
# Here gradient function
def lnpdf_grad(self, x):
x = x.reshape(self.x_shape)
K = self.kern.K(x)
paran = self.lambdaa * np.eye(x.shape[0]) + self.A.dot(K).dot(self.A)
inv_part = pdinv(paran)[0]
b = self.A.dot(inv_part).dot(self.A).dot(K).dot(self.a)
a_Minus_b = self.a - b
a_b_trans = np.transpose(a_Minus_b)
DJ_star_DK = (1. / self.lambdaa) * (a_Minus_b.dot(a_b_trans))
DJ_star_DX = self.kern.gradients_X(DJ_star_DK, x)
return (-1. / self.sigma2) * DJ_star_DX
def rvs(self, n):
return np.random.rand(n) # A WRONG implementation
def __str__(self):
return 'DGPLVM_prior'
class DGPLVM(Prior):
"""
Implementation of the Discriminative Gaussian Process Latent Variable model paper, by Raquel.
:param sigma2: constant
.. Note:: DGPLVM for Classification paper implementation
"""
domain = _REAL
# _instances = []
# def __new__(cls, mu, sigma): # Singleton:
# if cls._instances:
# cls._instances[:] = [instance for instance in cls._instances if instance()]
# for instance in cls._instances:
# if instance().mu == mu and instance().sigma == sigma:
# return instance()
# o = super(Prior, cls).__new__(cls, mu, sigma)
# cls._instances.append(weakref.ref(o))
# return cls._instances[-1]()
def __init__(self, sigma2, lbl, x_shape):
self.sigma2 = sigma2
# self.x = x
self.lbl = lbl
self.classnum = lbl.shape[1]
self.datanum = lbl.shape[0]
self.x_shape = x_shape
self.dim = x_shape[1]
def get_class_label(self, y):
for idx, v in enumerate(y):
if v == 1:
return idx
return -1
# This function assigns each data point to its own class
# and returns the dictionary which contains the class name and parameters.
def compute_cls(self, x):
cls = {}
# Appending each data point to its proper class
for j in xrange(self.datanum):
class_label = self.get_class_label(self.lbl[j])
if class_label not in cls:
cls[class_label] = []
cls[class_label].append(x[j])
return cls
# This function computes mean of each class. The mean is calculated through each dimension
def compute_Mi(self, cls):
M_i = np.zeros((self.classnum, self.dim))
for i in cls:
# Mean of each class
M_i[i] = np.mean(cls[i], axis=0)
return M_i
# Adding data points as tuple to the dictionary so that we can access indices
def compute_indices(self, x):
data_idx = {}
for j in xrange(self.datanum):
class_label = self.get_class_label(self.lbl[j])
if class_label not in data_idx:
data_idx[class_label] = []
t = (j, x[j])
data_idx[class_label].append(t)
return data_idx
# Adding indices to the list so we can access whole the indices
def compute_listIndices(self, data_idx):
lst_idx = []
lst_idx_all = []
for i in data_idx:
if len(lst_idx) == 0:
pass
#Do nothing, because it is the first time list is created so is empty
else:
lst_idx = []
# Here we put indices of each class in to the list called lst_idx_all
for m in xrange(len(data_idx[i])):
lst_idx.append(data_idx[i][m][0])
lst_idx_all.append(lst_idx)
return lst_idx_all
# This function calculates between classes variances
def compute_Sb(self, cls, M_i, M_0):
Sb = np.zeros((self.dim, self.dim))
for i in cls:
B = (M_i[i] - M_0).reshape(self.dim, 1)
B_trans = B.transpose()
Sb += (float(len(cls[i])) / self.datanum) * B.dot(B_trans)
return Sb
# This function calculates within classes variances
def compute_Sw(self, cls, M_i):
Sw = np.zeros((self.dim, self.dim))
for i in cls:
N_i = float(len(cls[i]))
W_WT = np.zeros((self.dim, self.dim))
for xk in cls[i]:
W = (xk - M_i[i])
W_WT += np.outer(W, W)
Sw += (N_i / self.datanum) * ((1. / N_i) * W_WT)
return Sw
# Calculating beta and Bi for Sb
def compute_sig_beta_Bi(self, data_idx, M_i, M_0, lst_idx_all):
# import pdb
# pdb.set_trace()
B_i = np.zeros((self.classnum, self.dim))
Sig_beta_B_i_all = np.zeros((self.datanum, self.dim))
for i in data_idx:
# pdb.set_trace()
# Calculating Bi
B_i[i] = (M_i[i] - M_0).reshape(1, self.dim)
for k in xrange(self.datanum):
for i in data_idx:
N_i = float(len(data_idx[i]))
if k in lst_idx_all[i]:
beta = (float(1) / N_i) - (float(1) / self.datanum)
Sig_beta_B_i_all[k] += float(N_i) / self.datanum * (beta * B_i[i])
else:
beta = -(float(1) / self.datanum)
Sig_beta_B_i_all[k] += float(N_i) / self.datanum * (beta * B_i[i])
Sig_beta_B_i_all = Sig_beta_B_i_all.transpose()
return Sig_beta_B_i_all
# Calculating W_j s separately so we can access all the W_j s anytime
def compute_wj(self, data_idx, M_i):
W_i = np.zeros((self.datanum, self.dim))
for i in data_idx:
N_i = float(len(data_idx[i]))
for tpl in data_idx[i]:
xj = tpl[1]
j = tpl[0]
W_i[j] = (xj - M_i[i])
return W_i
# Calculating alpha and Wj for Sw
def compute_sig_alpha_W(self, data_idx, lst_idx_all, W_i):
Sig_alpha_W_i = np.zeros((self.datanum, self.dim))
for i in data_idx:
N_i = float(len(data_idx[i]))
for tpl in data_idx[i]:
k = tpl[0]
for j in lst_idx_all[i]:
if k == j:
alpha = 1 - (float(1) / N_i)
Sig_alpha_W_i[k] += (alpha * W_i[j])
else:
alpha = 0 - (float(1) / N_i)
Sig_alpha_W_i[k] += (alpha * W_i[j])
Sig_alpha_W_i = (1. / self.datanum) * np.transpose(Sig_alpha_W_i)
return Sig_alpha_W_i
# This function calculates log of our prior
def lnpdf(self, x):
x = x.reshape(self.x_shape)
cls = self.compute_cls(x)
M_0 = np.mean(x, axis=0)
M_i = self.compute_Mi(cls)
Sb = self.compute_Sb(cls, M_i, M_0)
Sw = self.compute_Sw(cls, M_i)
# Sb_inv_N = np.linalg.inv(Sb + np.eye(Sb.shape[0]) * (np.diag(Sb).min() * 0.1))
#Sb_inv_N = np.linalg.inv(Sb+np.eye(Sb.shape[0])*0.1)
Sb_inv_N = pdinv(Sb+ np.eye(Sb.shape[0]) * (np.diag(Sb).min() * 0.1))[0]
return (-1 / self.sigma2) * np.trace(Sb_inv_N.dot(Sw))
# This function calculates derivative of the log of prior function
def lnpdf_grad(self, x):
x = x.reshape(self.x_shape)
cls = self.compute_cls(x)
M_0 = np.mean(x, axis=0)
M_i = self.compute_Mi(cls)
Sb = self.compute_Sb(cls, M_i, M_0)
Sw = self.compute_Sw(cls, M_i)
data_idx = self.compute_indices(x)
lst_idx_all = self.compute_listIndices(data_idx)
Sig_beta_B_i_all = self.compute_sig_beta_Bi(data_idx, M_i, M_0, lst_idx_all)
W_i = self.compute_wj(data_idx, M_i)
Sig_alpha_W_i = self.compute_sig_alpha_W(data_idx, lst_idx_all, W_i)
# Calculating inverse of Sb and its transpose and minus
# Sb_inv_N = np.linalg.inv(Sb + np.eye(Sb.shape[0]) * (np.diag(Sb).min() * 0.1))
# Sb_inv_N = np.linalg.inv(Sb+np.eye(Sb.shape[0])*0.1)
Sb_inv_N = pdinv(Sb+ np.eye(Sb.shape[0]) * (np.diag(Sb).min() * 0.1))[0]
Sb_inv_N_trans = np.transpose(Sb_inv_N)
Sb_inv_N_trans_minus = -1 * Sb_inv_N_trans
Sw_trans = np.transpose(Sw)
# Calculating DJ/DXk
DJ_Dxk = 2 * (
Sb_inv_N_trans_minus.dot(Sw_trans).dot(Sb_inv_N_trans).dot(Sig_beta_B_i_all) + Sb_inv_N_trans.dot(
Sig_alpha_W_i))
# Calculating derivative of the log of the prior
DPx_Dx = ((-1 / self.sigma2) * DJ_Dxk)
return DPx_Dx.T
# def frb(self, x):
# from functools import partial
# from GPy.models import GradientChecker
# f = partial(self.lnpdf)
# df = partial(self.lnpdf_grad)
# grad = GradientChecker(f, df, x, 'X')
# grad.checkgrad(verbose=1)
def rvs(self, n):
return np.random.rand(n) # A WRONG implementation
def __str__(self):
return 'DGPLVM_prior'