core file

This commit is contained in:
Nicolo Fusi 2012-11-29 16:25:27 +00:00
parent bb2e4c258d
commit e1b766a8dd
4 changed files with 761 additions and 0 deletions

3
GPy/core/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from model import *
from parameterised import *
import priors

302
GPy/core/model.py Normal file
View file

@ -0,0 +1,302 @@
import numpy as np
from scipy import optimize
import sys, pdb
#import numdifftools as ndt
from parameterised import parameterised, truncate_pad
import priors
from ..util.linalg import jitchol
from ..inference import optimization, SGD
class model(parameterised):
def __init__(self):
parameterised.__init__(self)
self.priors = [None for i in range(self.get_param().size)]
self.optimization_runs = []
self.sampling_runs = []
self.set_param(self.get_param())
def get_param(self):
raise NotImplementedError, "this needs to be implemented to utilise the model class"
def set_param(self,x):
raise NotImplementedError, "this needs to be implemented to utilise the model class"
def log_likelihood(self):
raise NotImplementedError, "this needs to be implemented to utilise the model class"
def log_likelihood_gradients(self):
raise NotImplementedError, "this needs to be implemented to utilise the model class"
def set_prior(self,which,what):
"""sets priors on the model parameters.
Arguments
---------
which -- string, regexp, or integer array
what -- instance of a prior class
Notes
-----
Asserts that the prior is suitable for the constraint. If the
wrong constraint is in place, an error is raised. If no
constraint is in place, one is added (warning printed).
For tied parameters, the prior will only be "counted" once, thus
a prior object is only inserted on the first tied index
"""
which = self.grep_param_names(which)
#check tied situation
tie_partial_matches = [tie for tie in self.tied_indices if (not set(tie).isdisjoint(set(which))) & (not set(tie)==set(which))]
if len(tie_partial_matches):
raise ValueError, "cannot place prior across partial ties"
tie_matches = [tie for tie in self.tied_indices if set(which)==set(tie) ]
if len(tie_matches)>1:
raise ValueError, "cannot place prior across multiple ties"
elif len(tie_matches)==1:
which = which[:1]# just place a prior object on the first parameter
#check constraints are okay
if isinstance(what, (priors.gamma, priors.log_Gaussian)):
assert not np.any(which[:,None]==self.constrained_negative_indices), "constraint and prior incompatible"
assert not np.any(which[:,None]==self.constrained_bounded_indices), "constraint and prior incompatible"
unconst = np.setdiff1d(which, self.constrained_positive_indices)
if len(unconst):
print "Warning: constraining parameters to be positive:"
print '\n'.join([n for i,n in enumerate(self.get_param_names()) if i in unconst])
print '\n'
self.constrain_positive(unconst)
elif isinstance(what,priors.Gaussian):
assert not np.any(which[:,None]==self.all_constrained_indices()), "constraint and prior incompatible"
else:
raise ValueError, "prior not recognised"
#store the prior in a local list
for w in which:
self.priors[w] = what
def log_prior(self):
"""evaluate the prior"""
return np.sum([p.lnpdf(x) for p, x in zip(self.priors,self.get_param()) if p is not None])
def log_prior_gradients(self):
"""evaluate the gradients of the priors"""
x = self.get_param()
ret = np.zeros(x.size)
[np.put(ret,i,p.lnpdf_grad(xx)) for i,(p,xx) in enumerate(zip(self.priors,x)) if not p is None]
return ret
def extract_gradients(self):
"""
Use self.log_likelihood_gradients and self.prior_gradients to get the gradients of the model.
Adjust the gradient for constraints and ties, return.
"""
g = self.log_likelihood_gradients() + self.log_prior_gradients()
x = self.get_param()
g[self.constrained_positive_indices] = g[self.constrained_positive_indices]*x[self.constrained_positive_indices]
g[self.constrained_negative_indices] = g[self.constrained_negative_indices]*x[self.constrained_negative_indices]
[np.put(g,i,g[i]*(x[i]-l)*(h-x[i])/(h-l)) for i,l,h in zip(self.constrained_bounded_indices, self.constrained_bounded_lowers, self.constrained_bounded_uppers)]
[np.put(g,i,v) for i,v in [(t[0],np.sum(g[t])) for t in self.tied_indices]]
if len(self.tied_indices) or len(self.constrained_fixed_indices):
to_remove = np.hstack((self.constrained_fixed_indices+[t[1:] for t in self.tied_indices]))
return np.delete(g,to_remove)
else:
return g
def randomize(self):
"""
Randomize the model.
Make this draw from the prior if one exists, else draw from N(0,1)
"""
#first take care of all parameters (from N(0,1))
x = self.extract_param()
x = np.random.randn(x.size)
self.expand_param(x)
#now draw from prior where possible
x = self.get_param()
[np.put(x,i,p.rvs(1)) for i,p in enumerate(self.priors) if not p is None]
self.set_param(x)
self.expand_param(self.extract_param())#makes sure all of the tied parameters get the same init (since there's only one prior object...)
def optimize_restarts(self, Nrestarts=10, robust=False, verbose=True, **kwargs):
"""
Perform random restarts of the model, and set the model to the best
seen solution.
If the robust flag is set, exceptions raised during optimizations will
be handled silently. If _all_ runs fail, the model is reset to the
existing parameter values.
Notes
-----
**kwargs are passed to the optimizer. They can be:
:max_f_eval: maximum number of function evaluations
:messages: whether to display during optimisation
:verbose: whether to show informations about the current restart
"""
initial_parameters = self.extract_param()
for i in range(Nrestarts):
try:
self.randomize()
self.optimize(**kwargs)
if verbose:
print("Optimization restart {0}/{1}, f = {2}".format(i+1,
Nrestarts,
self.optimization_runs[-1].f_opt))
except Exception as e:
if robust:
print("Warning - optimization restart {0}/{1} failed".format(i+1, Nrestarts))
else:
raise e
if len(self.optimization_runs):
i = np.argmax([o.f_opt for o in self.optimization_runs])
self.expand_param(self.optimization_runs[i].x_opt)
else:
self.expand_param(initial_parameters)
def optimize(self, optimizer = 'tnc', start = None, **kwargs):
"""
Optimize the model using self.log_likelihood and self.log_likelihood_gradient, as well as self.priors.
kwargs are passed to the optimizer. They can be:
:max_f_eval: maximum number of function evaluations
:messages: whether to display during optimisatio
"""
def f(x):
self.expand_param(x)
return -self.log_likelihood()-self.log_prior()
def fp(x):
self.expand_param(x)
return -self.extract_gradients()
def f_fp(x):
self.expand_param(x)
return -self.log_likelihood()-self.log_prior(),-self.extract_gradients()
if start == None:
start = self.extract_param()
optimizer = optimization.get_optimizer(optimizer)
opt = optimizer(start, f_fp=f_fp,f=f,fp=fp, **kwargs)
opt.run()
self.optimization_runs.append(opt)
self.expand_param(opt.x_opt)
def optimize_SGD(self, momentum = 0.1, learning_rate = 0.01, iterations = 20, **kwargs):
# assert self.Y.shape[1] > 1, "SGD only works with D > 1"
sgd = SGD.StochasticGD(self, iterations, learning_rate, momentum, **kwargs)
sgd.run()
self.optimization_runs.append(sgd)
def Laplace_covariance(self):
"""return the covariance matric of a Laplace approximatino at the current (stationary) point"""
#TODO add in the prior contributions for MAP estimation
#TODO fix the hessian for tied, constrained and fixed components
if hasattr(self, 'log_likelihood_hessian'):
A = -self.log_likelihood_hessian()
else:
print "numerically calculating hessian. please be patient!"
x = self.get_param()
def f(x):
self.set_param(x)
return self.log_likelihood()
h = ndt.Hessian(f)
A = -h(x)
self.set_param(x)
# check for almost zero components on the diagonal which screw up the cholesky
aa = np.nonzero((np.diag(A)<1e-6) & (np.diag(A)>0.))[0]
A[aa,aa] = 0.
return A
def Laplace_evidence(self):
"""Returns an estiamte of the model evidence based on the Laplace approximation.
Uses a numerical estimate of the hessian if none is available analytically"""
A = self.Laplace_covariance()
try:
hld = np.sum(np.log(np.diag(jitchol(A)[0])))
except:
return np.nan
return 0.5*self.get_param().size*np.log(2*np.pi) + self.log_likelihood() - hld
def __str__(self):
s = parameterised.__str__(self).split('\n')
#add priors to the string
strs = [str(p) if p is not None else '' for p in self.priors]
width = np.array(max([len(p) for p in strs] + [5])) + 4
s[0] = 'Marginal log-likelihood: {0:.3e}\n'.format(self.log_likelihood()) + s[0]
s[0] += "|{h:^{col}}".format(h = 'Prior', col = width)
s[1] += '-'*(width + 1)
for p in range(2, len(strs)+2):
s[p] += '|{prior:^{width}}'.format(prior = strs[p-2], width = width)
return '\n'.join(s)
def checkgrad(self, verbose = True, include_priors=False, step=1e-6, tolerance = 1e-3, *args):
"""
Check the gradient of the model by comparing to a numerical estimate.
If the overall gradient fails, invividual components are tested.
"""
x = self.extract_param().copy()
#choose a random direction to step in:
dx = step*np.sign(np.random.uniform(-1,1,x.size))
#evaulate around the point x
self.expand_param(x+dx)
f1,g1 = self.log_likelihood() + self.log_prior(), self.extract_gradients()
self.expand_param(x-dx)
f2,g2 = self.log_likelihood() + self.log_prior(), self.extract_gradients()
self.expand_param(x)
gradient = self.extract_gradients()
numerical_gradient = (f1-f2)/(2*dx)
ratio = (f1-f2)/(2*np.dot(dx,gradient))
if verbose:
#print "gradient = ",gradient
#print "numerical gradient = ",numerical_gradient
print " Gradient ratio = ", ratio, '\n'
sys.stdout.flush()
if not (np.abs(1.-ratio)>tolerance):
if verbose:
print 'Gradcheck passed'
else:
if verbose:
print "Global ratio far from unity. Testing individual gradients\n"
try:
names = self.extract_param_names()
except NotImplementedError:
names = ['Variable %i'%i for i in range(len(x))]
for i in range(len(x)):
xx = x.copy()
xx[i] += step
self.expand_param(xx)
f1,g1 = self.log_likelihood() + self.log_prior(), self.extract_gradients()[i]
xx[i] -= 2.*step
self.expand_param(xx)
f2,g2 = self.log_likelihood() + self.log_prior(), self.extract_gradients()[i]
self.expand_param(x)
gradient = self.extract_gradients()[i]
numerical_gradient = (f1-f2)/(2*step)
ratio = (f1-f2)/(2*step*gradient)
difference = np.abs((f1-f2)/2/step - gradient)
if verbose:
print "{0:10s} ratio: {1:15f} difference: {2:15f} analytical: {3:15f} numerical: {4:15f}".format(names[i], float(ratio), float(difference), gradient, float(numerical_gradient)),
if (np.abs(ratio-1)<tolerance):
print " "+'\033[92m' + u"\u2713" + '\033[0m' # green chackmark
else:
print " "+'\033[91m' + u"\u2717" + '\033[0m' # red crossmark
return False
return True

333
GPy/core/parameterised.py Normal file
View file

@ -0,0 +1,333 @@
import numpy as np
import re
import copy
import cPickle
import os
from ..util.squashers import sigmoid
def truncate_pad(string,width,align='m'):
"""
A helper function to make aligned strings for parameterised.__str__
"""
width=max(width,4)
if len(string)>width:
return string[:width-3]+'...'
elif len(string)==width:
return string
elif len(string)<width:
diff = width-len(string)
if align=='m':
return ' '*np.floor(diff/2.) + string + ' '*np.ceil(diff/2.)
elif align=='l':
return string + ' '*diff
elif align=='r':
return ' '*diff + string
else:
raise ValueError
class parameterised:
def __init__(self):
"""
This is the base class for model and kernel. Mostly just handles tieing and constraining of parameters
"""
self.tied_indices = []
self.constrained_fixed_indices = []
self.constrained_fixed_values = []
self.constrained_positive_indices = np.empty(shape=(0,),dtype=np.int64)
self.constrained_negative_indices = np.empty(shape=(0,),dtype=np.int64)
self.constrained_bounded_indices = []
self.constrained_bounded_uppers = []
self.constrained_bounded_lowers = []
def pickle(self,filename,protocol=-1):
f = file(filename,'w')
cPickle.dump(self,f,protocol)
f.close()
def copy(self):
"""
Returns a (deep) copy of the current model
"""
return copy.deepcopy(self)
def tie_param(self, which):
matches = self.grep_param_names(which)
assert matches.size > 0, "need at least something to tie together"
if len(self.tied_indices):
assert not np.any(matches[:,None]==np.hstack(self.tied_indices)), "Some indices are already tied!"
self.tied_indices.append(matches)
#TODO only one of the priors will be evaluated. Give a warning message if the priors are not identical
if hasattr(self,'prior'):
pass
self.expand_param(self.extract_param())# sets tied parameters to single value
def untie_everything(self):
"""Unties all parameters by setting tied_indices to an empty list."""
self.tied_indices = []
def all_constrained_indices(self):
"""Return a np array of all the constrained indices"""
ret = [np.hstack(i) for i in [self.constrained_bounded_indices, self.constrained_positive_indices, self.constrained_negative_indices, self.constrained_fixed_indices] if len(i)]
if len(ret):
return np.hstack(ret)
else:
return []
def grep_param_names(self, expr):
"""
Arguments
---------
expr -- can be a regular expression object or a string to be turned into regular expression object.
Returns
-------
the indices of self.get_param_names which match the regular expression.
Notes
-----
Other objects are passed through - i.e. integers which were'nt meant for grepping
"""
if type(expr) is str:
expr = re.compile(expr)
return np.nonzero([expr.search(name) for name in self.get_param_names()])[0]
elif type(expr) is re._pattern_type:
return np.nonzero([expr.search(name) for name in self.get_param_names()])[0]
else:
return expr
def constrain_positive(self, which):
"""
Set positive constraints.
Arguments
---------
which -- np.array(dtype=int), or regular expression object or string
"""
matches = self.grep_param_names(which)
assert not np.any(matches[:,None]==self.all_constrained_indices()), "Some indices are already constrained"
self.constrained_positive_indices = np.hstack((self.constrained_positive_indices, matches))
#check to ensure constraint is in place
x = self.get_param()
for i,xx in enumerate(x):
if (xx<0) & (i in matches):
x[i] = -xx
self.set_param(x)
def unconstrain(self,which):
"""Unconstrain matching parameters. does not untie parameters"""
matches = self.grep_param_names(which)
#positive/negative
self.constrained_positive_indices = np.delete(self.constrained_positive_indices,np.nonzero(np.sum(self.constrained_positive_indices[:,None]==matches[None,:],1))[0])
self.constrained_negative_indices = np.delete(self.constrained_negative_indices,np.nonzero(np.sum(self.constrained_negative_indices[:,None]==matches[None,:],1))[0])
#bounded
if len(self.constrained_bounded_indices):
self.constrained_bounded_indices = [np.delete(a,np.nonzero(np.sum(a[:,None]==matches[None,:],1))[0]) for a in self.constrained_bounded_indices]
if np.hstack(self.constrained_bounded_indices).size:
self.constrained_bounded_uppers, self.constrained_bounded_lowers, self.constrained_bounded_indices = zip(*[(u,l,i) for u,l,i in zip(self.constrained_bounded_uppers, self.constrained_bounded_lowers, self.constrained_bounded_indices) if i.size])
self.constrained_bounded_uppers, self.constrained_bounded_lowers, self.constrained_bounded_indices = list(self.constrained_bounded_uppers), list(self.constrained_bounded_lowers), list(self.constrained_bounded_indices)
else:
self.constrained_bounded_uppers, self.constrained_bounded_lowers, self.constrained_bounded_indices = [],[],[]
#fixed:
for i,indices in enumerate(self.constrained_fixed_indices):
self.constrained_fixed_indices[i] = np.delete(indices,np.nonzero(np.sum(indices[:,None]==matches[None,:],1))[0])
#remove empty elements
tmp = [(i,v) for i,v in zip(self.constrained_fixed_indices, self.constrained_fixed_values) if len(i)]
if tmp:
self.constrained_fixed_indices, self.constrained_fixed_values = zip(*tmp)
self.constrained_fixed_indices, self.constrained_fixed_values = list(self.constrained_fixed_indices), list(self.constrained_fixed_values)
else:
self.constrained_fixed_indices, self.constrained_fixed_values = [],[]
def constrain_negative(self,which):
"""
Set negative constraints.
:param which: which variables to constrain
:type which: regular expression string
"""
matches = self.grep_param_names(which)
assert not np.any(matches[:,None]==self.all_constrained_indices()), "Some indices are already constrained"
self.constrained_negative_indices = np.hstack((self.constrained_negative_indices, matches))
#check to ensure constraint is in place
x = self.get_param()
for i,xx in enumerate(x):
if (xx>0.) and (i in matches):
x[i] = -xx
self.set_param(x)
def constrain_bounded(self, which, lower, upper):
"""Set bounded constraints.
Arguments
---------
which -- np.array(dtype=int), or regular expression object or string
upper -- (float) the upper bound on the constraint
lower -- (float) the lower bound on the constraint
"""
matches = self.grep_param_names(which)
assert not np.any(matches[:,None]==self.all_constrained_indices()), "Some indices are already constrained"
assert lower < upper, "lower bound must be smaller than upper bound!"
self.constrained_bounded_indices.append(matches)
self.constrained_bounded_uppers.append(upper)
self.constrained_bounded_lowers.append(lower)
#check to ensure constraint is in place
x = self.get_param()
for i,xx in enumerate(x):
if ((xx<=lower)|(xx>=upper)) & (i in matches):
x[i] = sigmoid(xx)*(upper-lower) + lower
self.set_param(x)
def constrain_fixed(self, which, value = None):
"""
Arguments
---------
:param which: np.array(dtype=int), or regular expression object or string
:param value: a float to fix the matched values to. If the value is not specified,
the parameter is fixed to the current value
Notes
-----
Fixing a parameter which is tied to another, or constrained in some way will result in an error.
To fix multiple parameters to the same value, simply pass a regular expression which matches both parameter names, or pass both of the indexes
"""
matches = self.grep_param_names(which)
assert not np.any(matches[:,None]==self.all_constrained_indices()), "Some indices are already constrained"
self.constrained_fixed_indices.append(matches)
if value != None:
self.constrained_fixed_values.append(value)
else:
self.constrained_fixed_values.append(self.get_param()[self.constrained_fixed_indices[-1]])
self.constrained_fixed_values.append(value)
self.expand_param(self.extract_param())
def extract_param(self):
"""use self.get_param to get the 'true' parameters of the model, which are then tied, constrained and fixed"""
x = self.get_param()
x[self.constrained_positive_indices] = np.log(x[self.constrained_positive_indices])
x[self.constrained_negative_indices] = np.log(-x[self.constrained_negative_indices])
[np.put(x,i,np.log(np.clip(x[i]-l,1e-10,np.inf)/np.clip(h-x[i],1e-10,np.inf))) for i,l,h in zip(self.constrained_bounded_indices, self.constrained_bounded_lowers, self.constrained_bounded_uppers)]
to_remove = self.constrained_fixed_indices+[t[1:] for t in self.tied_indices]
if len(to_remove):
return np.delete(x,np.hstack(to_remove))
else:
return x
def expand_param(self,x):
""" takes the vector x, which is then modified (by untying, reparameterising or inserting fixed values), and then call self.set_param"""
#work out how many places are fixed, and where they are. tricky logic!
Nfix_places = 0.
if len(self.tied_indices):
Nfix_places += np.hstack(self.tied_indices).size-len(self.tied_indices)
if len(self.constrained_fixed_indices):
Nfix_places += np.hstack(self.constrained_fixed_indices).size
if Nfix_places:
fix_places = np.hstack(self.constrained_fixed_indices+[t[1:] for t in self.tied_indices])
else:
fix_places = []
free_places = np.setdiff1d(np.arange(Nfix_places+x.size,dtype=np.int),fix_places)
#put the models values in the vector xx
xx = np.zeros(Nfix_places+free_places.size,dtype=np.float64)
xx[free_places] = x
[np.put(xx,i,v) for i,v in zip(self.constrained_fixed_indices, self.constrained_fixed_values)]
[np.put(xx,i,v) for i,v in [(t[1:],xx[t[0]]) for t in self.tied_indices] ]
xx[self.constrained_positive_indices] = np.exp(xx[self.constrained_positive_indices])
xx[self.constrained_negative_indices] = -np.exp(xx[self.constrained_negative_indices])
[np.put(xx,i,low+sigmoid(xx[i])*(high-low)) for i,low,high in zip(self.constrained_bounded_indices, self.constrained_bounded_lowers, self.constrained_bounded_uppers)]
self.set_param(xx)
def extract_param_names(self):
"""
Returns the parameter names as propagated after constraining,
tying or fixing, i.e. a list of the same length as extract_param()
"""
n = self.get_param_names()
#remove/concatenate the tied parameter names
if len(self.tied_indices):
for t in self.tied_indices:
n[t[0]] = "<tie>".join([n[tt] for tt in t])
remove = np.hstack([t[1:] for t in self.tied_indices])
else:
remove=np.empty(shape=(0,),dtype=np.int)
#also remove the fixed params
if len(self.constrained_fixed_indices):
remove = np.hstack((remove, np.hstack(self.constrained_fixed_indices)))
#add markers to show that some variables are constrained
for i in self.constrained_positive_indices:
n[i] = n[i]+'(+ve)'
for i in self.constrained_negative_indices:
n[i] = n[i]+'(-ve)'
for i,l,h in zip(self.constrained_bounded_indices, self.constrained_bounded_lowers, self.constrained_bounded_uppers):
for ii in i:
n[ii] = n[ii]+'(bounded)'
n = [nn for i,nn in enumerate(n) if not i in remove]
return n
def __str__(self,nw=30):
"""
Return a string describing the parameter names and their ties and constraints
"""
names = self.get_param_names()
N = len(names)
if not N:
return "This object has no free parameters."
header = ['Name','Value','Constraints','Ties']
values = self.get_param() #map(str,self.get_param())
#sort out the constraints
constraints = ['']*len(names)
for i in self.constrained_positive_indices:
constraints[i] = '(+ve)'
for i in self.constrained_negative_indices:
constraints[i] = '(-ve)'
for i in self.constrained_fixed_indices:
for ii in i:
constraints[ii] = 'Fixed'
for i,u,l in zip(self.constrained_bounded_indices, self.constrained_bounded_uppers, self.constrained_bounded_lowers):
for ii in i:
constraints[ii] = '('+str(l)+', '+str(u)+')'
#sort out the ties
ties = ['']*len(names)
for i,tie in enumerate(self.tied_indices):
for j in tie:
ties[j] = '('+str(i)+')'
values = ['%.4f' % float(v) for v in values]
max_names = max([len(names[i]) for i in range(len(names))] + [len(header[0])])
max_values = max([len(values[i]) for i in range(len(values))] + [len(header[1])])
max_constraint = max([len(constraints[i]) for i in range(len(constraints))] + [len(header[2])])
max_ties = max([len(ties[i]) for i in range(len(ties))] + [len(header[3])])
cols = np.array([max_names, max_values, max_constraint, max_ties]) + 4
columns = cols.sum()
header_string = ["{h:^{col}}".format(h = header[i], col = cols[i]) for i in range(len(cols))]
header_string = map(lambda x: '|'.join(x), [header_string])
separator = '-'*len(header_string[0])
param_string = ["{n:^{c0}}|{v:^{c1}}|{c:^{c2}}|{t:^{c3}}".format(n = names[i], v = values[i], c = constraints[i], t = ties[i],
c0 = cols[0], c1 = cols[1], c2 = cols[2], c3 = cols[3]) for i in range(len(values))]
return ('\n'.join([header_string[0], separator]+param_string)) + '\n'

123
GPy/core/priors.py Normal file
View file

@ -0,0 +1,123 @@
import numpy as np
import pylab as pb
from scipy.special import gammaln, digamma
from ..util.linalg import pdinv
class prior:
def pdf(self,x):
return np.exp(self.lnpdf(x))
def plot(self):
rvs = self.rvs(1000)
pb.hist(rvs,100,normed=True)
xmin,xmax = pb.xlim()
xx = np.linspace(xmin,xmax,1000)
pb.plot(xx,self.pdf(xx),'r',linewidth=2)
class Gaussian(prior):
"""
Implementation of the univariate Gaussian probability function, coupled with random variables, since scipy.stats sucks.
Using Bishop 2006 notation"""
def __init__(self,mu,sigma):
self.mu = float(mu)
self.sigma = float(sigma)
self.sigma2 = np.square(self.sigma)
self.constant = -0.5*np.log(2*np.pi*self.sigma2)
def __str__(self):
return "N("+str(np.round(self.mu))+', '+str(np.round(self.sigma2))+')'
def lnpdf(self,x):
return self.constant - 0.5*np.square(x-self.mu)/self.sigma2
def lnpdf_grad(self,x):
return -(x-self.mu)/self.sigma2
def rvs(self,n):
return np.random.randn(n)*self.sigma + self.mu
class log_Gaussian(prior):
"""
"""
def __init__(self,mu,sigma):
self.mu = float(mu)
self.sigma = float(sigma)
self.sigma2 = np.square(self.sigma)
self.constant = -0.5*np.log(2*np.pi*self.sigma2)
def __str__(self):
return "lnN("+str(np.round(self.mu))+', '+str(np.round(self.sigma2))+')'
def lnpdf(self,x):
return self.constant - 0.5*np.square(np.log(x)-self.mu)/self.sigma2 -np.log(x)
def lnpdf_grad(self,x):
return -((np.log(x)-self.mu)/self.sigma2+1.)/x
def rvs(self,n):
return np.exp(np.random.randn(n)*self.sigma + self.mu)
class multivariate_Gaussian:
"""
Implementation of the multivariate Gaussian probability function, coupled with random variables, since scipy.stats sucks.
Using Bishop 2006 notation"""
def __init__(self,mu,var):
self.mu = np.array(mu).flatten()
self.var = np.array(var)
assert len(self.var.shape)==2
assert self.var.shape[0]==self.var.shape[1]
assert self.var.shape[0]==self.mu.size
self.D = self.mu.size
self.inv, self.hld = pdinv(self.var)
self.constant = -0.5*self.D*np.log(2*np.pi) - self.hld
def summary(self):
pass #TODO
def pdf(self,x):
return np.exp(self.lnpdf(x))
def lnpdf(self,x):
d = x-self.mu
return self.constant - 0.5*np.sum(d*np.dot(d,self.inv),1)
def lnpdf_grad(self,x):
d = x-self.mu
return -np.dot(self.inv,d)
def rvs(self,n):
return np.random.multivariate_normal(self.mu, self.var,n)
def plot(self):
if self.D==2:
rvs = self.rvs(200)
pb.plot(rvs[:,0],rvs[:,1], 'kx', mew=1.5)
xmin,xmax = pb.xlim()
ymin,ymax = pb.ylim()
xx, yy = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
xflat = np.vstack((xx.flatten(),yy.flatten())).T
zz = self.pdf(xflat).reshape(100,100)
pb.contour(xx,yy,zz,linewidths=2)
def gamma_from_EV(E,V):
"""create an instance of a gamma prior by specifying the Expected value(s) and Variance(s) of the distribution"""
a = np.square(E)/V
b = E/V
return gamma(a,b)
class gamma(prior):
"""
Implementation of the Gamma probability function, coupled with random variables, since scipy.stats sucks.
Using Bishop 2006 notation
"""
def __init__(self,a,b):
self.a = float(a)
self.b = float(b)
self.constant = -gammaln(self.a) + a*np.log(b)
def __str__(self):
return "Ga("+str(np.round(self.a))+', '+str(np.round(self.b))+')'
def summary(self):
ret = {"E[x]": self.a/self.b,\
"E[ln x]": digamma(self.a) - np.log(self.b),\
"var[x]": self.a/self.b/self.b,\
"Entropy": gammaln(self.a) - (self.a-1.)*digamma(self.a) - np.log(self.b) + self.a}
if self.a >1:
ret['Mode'] = (self.a-1.)/self.b
else:
ret['mode'] = np.nan
return ret
def lnpdf(self,x):
return self.constant + (self.a-1)*np.log(x) - self.b*x
def lnpdf_grad(self,x):
return (self.a-1.)/x - self.b
def rvs(self,n):
return np.random.gamma(scale=1./self.b,shape=self.a,size=n)