From e019b17cf965a7f673326140767dbc48c4332e32 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 4 Aug 2016 12:31:36 +0000 Subject: [PATCH] Added threaded option - but this doesn't work due to the global interpreter lock --- GPy/util/__init__.py | 1 + GPy/util/cluster_with_offset.py | 4 + GPy/util/threaded_cluster_with_offset.py | 206 +++++++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 GPy/util/threaded_cluster_with_offset.py diff --git a/GPy/util/__init__.py b/GPy/util/__init__.py index 685551fd..136e0d3b 100644 --- a/GPy/util/__init__.py +++ b/GPy/util/__init__.py @@ -17,3 +17,4 @@ from . import multioutput from . import parallel from . import functions from . import cluster_with_offset +from . import threaded_cluster_with_offset diff --git a/GPy/util/cluster_with_offset.py b/GPy/util/cluster_with_offset.py index fa8f1be8..d1e896f1 100644 --- a/GPy/util/cluster_with_offset.py +++ b/GPy/util/cluster_with_offset.py @@ -3,6 +3,7 @@ import GPy import numpy as np +import sys #so I can print dots def get_log_likelihood(inputs,data,clust): """Get the LL of a combined set of clusters, ignoring time series offsets. @@ -126,6 +127,9 @@ def cluster(data,inputs,verbose=False): #Compute the log-likelihood of each cluster (add them together) for clusti in range(len(active)): + if verbose: + sys.stdout.write('.') + sys.stdout.flush() if np.isnan(loglikes[clusti]): loglikes[clusti], unused_offset = get_log_likelihood_offset(inputs,data,[clusti]) diff --git a/GPy/util/threaded_cluster_with_offset.py b/GPy/util/threaded_cluster_with_offset.py new file mode 100644 index 00000000..66192c9f --- /dev/null +++ b/GPy/util/threaded_cluster_with_offset.py @@ -0,0 +1,206 @@ +# Copyright (c) 2016, Mike Smith +# Licensed under the BSD 3-clause license (see LICENSE.txt) + +import GPy +import numpy as np +import time +import sys #so I can print dots +from threading import Thread +maxthreads = 40 + + +def get_log_likelihood(inputs,data,clust,loglikesarray): + """Get the LL of a combined set of clusters, ignoring time series offsets. + + Get the log likelihood of a cluster without worrying about the fact + different time series are offset. We're using it here really for those + cases in which we only have one cluster to get the loglikelihood of. + + arguments: + inputs -- the 'X's in a list, one item per cluster + data -- the 'Y's in a list, one item per cluster + clust -- list of clusters to use + + returns a tuple: + log likelihood and the offset (which is always zero for this model) + """ + + S = data[0].shape[0] #number of time series + + #build a new dataset from the clusters, by combining all clusters together + X = np.zeros([0,1]) + Y = np.zeros([0,S]) + + #for each person in the cluster, + #add their inputs and data to the new dataset + for p in clust: + X = np.vstack([X,inputs[p]]) + Y = np.vstack([Y,data[p].T]) + + #find the loglikelihood. We just add together the LL for each time series. + #ll=0 + #for s in range(S): + # m = GPy.models.GPRegression(X,Y[:,s][:,None]) + # m.optimize() + # ll+=m.log_likelihood() + + m = GPy.models.GPRegression(X,Y) + m.optimize() + ll=m.log_likelihood() + + loglikesarray[clust[0]] = ll + return + +def get_log_likelihood_offset(inputs,data,clust,loglikesarray,offsetarray): + """Get the log likelihood of a combined set of clusters, fitting the offsets + + arguments: + inputs -- the 'X's in a list, one item per cluster + data -- the 'Y's in a list, one item per cluster + clust -- list of clusters to use + + returns a tuple: + log likelihood and the offset + """ + + assert len(clust)>1, "Use get_log_likelihood if you only have one cluster" + + + + S = data[0].shape[0] #number of time series + + X = np.zeros([0,2]) #notice the extra column, this is for the cluster index + Y = np.zeros([0,S]) + + #for each person in the cluster, add their inputs and data to the new + #dataset. Note we add an index identifying which person is which data point. + #This is for the offset model to use, to allow it to know which data points + #to shift. + for i,p in enumerate(clust): + idx = i*np.ones([inputs[p].shape[0],1]) + X = np.vstack([X,np.hstack([inputs[p],idx])]) + Y = np.vstack([Y,data[p].T]) + + m = GPy.models.GPOffsetRegression(X,Y) + #TODO: How to select a sensible prior? + m.offset.set_prior(GPy.priors.Gaussian(0,20)) + #TODO: Set a sensible start value for the length scale, + #make it long to help the offset fit. + + m.optimize() + + ll = m.log_likelihood() + offset = m.offset.values[0] + #return ll,offset + loglikesarray[clust[0],clust[1]] = ll + offsetarray[clust[0],clust[1]] = offset + return + +def cluster(data,inputs,verbose=False): + """Clusters data + + Using the new offset model, this method uses a greedy algorithm to cluster + the data. It starts with all the data points in separate clusters and tests + whether combining them increases the overall log-likelihood (LL). It then + iteratively joins pairs of clusters which cause the greatest increase in + the LL, until no join increases the LL. + + arguments: + inputs -- the 'X's in a list, one item per cluster + data -- the 'Y's in a list, one item per cluster + + returns a list of the clusters. + """ + N=len(data) + + + #Define a set of N active cluster + active = [] + for p in range(0,N): + active.append([p]) + + loglikes = np.zeros(len(active)) + loglikes[:] = None + + pairloglikes = np.zeros([len(active),len(active)]) + pairloglikes[:] = None + pairoffset = np.zeros([len(active),len(active)]) + + it = 0 + while True: + + if verbose: + it +=1 + print("Iteration %d" % it) + + threads = [] + #Compute the log-likelihood of each cluster (add them together) + for clusti in range(len(active)): + if verbose: + sys.stdout.write('.') + sys.stdout.flush() + if np.isnan(loglikes[clusti]): + t = Thread(target=get_log_likelihood,args=(inputs,data,[clusti],loglikes)) + threads.append(t) + #loglikes[clusti], unused_offset = get_log_likelihood_offset(inputs,data,[clusti]) + + #try combining with each other cluster... + for clustj in range(clusti): #count from 0 to clustj-1 + temp = [clusti,clustj] + if np.isnan(pairloglikes[clusti,clustj]): + #pairloglikes[clusti,clustj],pairoffset[clusti,clustj] = get_log_likelihood_offset(inputs,data,temp) + t = Thread(target=get_log_likelihood_offset,args=(inputs,data,temp,pairloglikes,pairoffset)) + threads.append(t) + + if len(threads)<=maxthreads: + for t in threads: + t.start() + for t in threads: + t.join() + + else: #should use a queue, as the method here assumes all threads take about same time, etc. + for i,t in enumerate(threads): + t.start() + if (i>=maxthreads): + threads[i-maxthreads].join() + for i in range(len(threads)-maxthreads,len(threads)): + threads[i].join() + + seploglikes = np.repeat(loglikes[:,None].T,len(loglikes),0)+np.repeat(loglikes[:,None],len(loglikes),1) + loglikeimprovement = pairloglikes - seploglikes #how much likelihood improves with clustering + top = np.unravel_index(np.nanargmax(pairloglikes-seploglikes), pairloglikes.shape) + + #if loglikeimprovement.shape[0]<3: + # #no more clustering to do - this shouldn't happen really unless + # #we've set the threshold to apply clustering to less than 0 + # break + + #if theres further clustering to be done... + if loglikeimprovement[top[0],top[1]]>0: + active[top[0]].extend(active[top[1]]) + offset=pairoffset[top[0],top[1]] + inputs[top[0]] = np.vstack([inputs[top[0]],inputs[top[1]]-offset]) + data[top[0]] = np.hstack([data[top[0]],data[top[1]]]) + del inputs[top[1]] + del data[top[1]] + del active[top[1]] + + #None = message to say we need to recalculate + pairloglikes[:,top[0]] = None + pairloglikes[top[0],:] = None + pairloglikes = np.delete(pairloglikes,top[1],0) + pairloglikes = np.delete(pairloglikes,top[1],1) + loglikes[top[0]] = None + loglikes = np.delete(loglikes,top[1]) + else: + break + + #if loglikeimprovement[top[0],top[1]]>0: + # print "joined" + # print top + # print offset + # print offsets + # print offsets[top[1]]-offsets[top[0]] + + #TODO Add a way to return the offsets applied to all the time series + return active