Processes not exiting

M

ma3mju

Hi all,

I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.

Thanks

Matt

parallel.py
===================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
import multiprocessing
############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "wattsdata2"
graphfile = "wattsgraphs2"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]

############################################################################################
#generate graphs
graphs = []
for i in range(0,num_graphs):
graphs.append(nx.watts_strogatz_graph(500,5,0.01))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()

#queues
easy_work_queue = multiprocessing.Queue()
hard_work_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
for i in range(0,len(noise)):
for k in range(0,num_graphs):
if int(intervals[j]) <=4000:
easy_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise,'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
else:
hard_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise,'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
l+=1

#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
easy_work_queue.put(None)
easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))

#run all workers
for worker in hard_workers:
worker.start()
for worker in easy_workers:
worker.start()
#wait for easy workers to finish
for worker in easy_workers:
worker.join()
print('worker joined')

#set off some of the easy workers on the hard work (maybe double
number of hard)
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
#wait for all hard workers to finish
for worker in hard_workers:
worker.join()

#construct data from the mess in the result queue

tempdata = np.zeros(l)
while not result_queue.empty():
data = result_queue.get()
tempdata[data[0]] = data[1]

finaldata = tempdata.reshape((len(intervals),len(noise),num_graphs))

np.save(savefile,finaldata)

=======================================================
GaussianProcessRegression.py
=======================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv
#to use more than one core
import multiprocessing

#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
def __init__(self,covariance_function,sigma):
#a covariance function object defined in CovarianceFunction
class
#note this uses the parent class but any children can be used
self.C = covariance_function
#a list of pts that are known and their values
self.pts = []
self.vals = []
#the inverse of K as defined in
#@book{coolen05:theoryofneural,
#ISBN = {0-19-853024-2},
#publisher = {Oxford University Press, USA},
#author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
#title = {Theory of neural information processing systems},
#year = {2005},
#}
self.K = np.array([])
#gaussian noise variable
self.sigma = float(sigma)
self.cholL = np.array([])


def add_data_points(self,points,vals):
#add all points to list
self.pts += points
self.vals += vals
arraysize = len(self.pts)
#construct K
K = np.zeros((arraysize,arraysize))
#for speed
pts = self.pts
between_points = self.C.between_points
if len(self.K):
K[:-1,:-1] = self.K
for i in xrange(0,arraysize):
for j in xrange(arraysize-len(points),arraysize):
K[i,j] = between_points(pts,pts[j])
K[j,i] = K[i,j]
K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
self.K = K

#calculate the prediction of a point based on data previously
given
def point_prediction(self,points):
mean = []
variance =[]
arraysize = len(self.pts)
#cholesky
#if self.cholL.shape[0] < arraysize:
L=np.linalg.cholesky(self.K)
# self.cholL = L
#else:
# L = self.cholL

alpha = sp.cho_solve((L,1),self.vals)
#create L in banded form
k=np.zeros((arraysize,len(points)))

##################################################################
#for speed get ref to functions im going to use and save them
between_points = self.C.between_points
pts = self.pts
dot = np.dot

##################################################################
for j in xrange(0,len(points)):
#create k
for i in xrange(0,arraysize):
k[i,j] = between_points(pts,points[j])

#calculate mean and variance
#call the command for forward substitution
###############fortran
call#######################################
v = dtrsv.dtrsv('L','N',arraysize,L,k)

##################################################################

#result
mean=dot(alpha,k)
for i in xrange(0,len(points)):
variance.append(between_points(points,points) - dot(v
[:,i],v[:,i]))
#return it in dictionary form
return {'mean':mean,'variance':variance}


# calculate the error for data given, where function is a vector
# of the function evaluated at a sufficiently large number of
points
# that the GPregression has been trying to learn
def error(self,function):
total = 0
#sum up variances
result = self.point_prediction(function[::2])
total = np.sum(result['variance'])
total = (1/float(len(function)/2))*total
return total

#clear what has been learnt so far
def clear(self):
self.pts = []
self.vals = []
self.K = np.array([])

#calculate the average error for a function defined in function
when give
#number_of_examples examples
def average_error_over_samples(self,function, sample_size,
number_of_examples):
avg = 0
numberofpoints = len(function)/2
for i in range(0,sample_size):
self.clear()
#generate points of the function
permpts = np.random.randint
(0,numberofpoints,number_of_examples)
#create the vectors
pts = []
vals = []
for j in range(0,number_of_examples):
pts.append(function[permpts[j]*2])
vals.append(function[permpts[j]*2+1])

#learn these points
self.add_data_points(pts,vals)
#print("points added")
avg = avg + self.error(function)
avg = avg/sample_size
return avg

#calculate the average error over functions over data of size
number_of_data_points for MOST cases this is
#also the generalization error a summary of which and
approximations to can be found in:
#@inproceedings{Sollich99learningcurves,
#booktitle = {Neural Computation},
#author = {Sollich, P.},
#title = {Learning curves for Gaussian process regression:
Approximations and bounds},
#pages = {200-2},
#year = {1999},
#}

def emprical_average_error_over_functions
(self,number_of_functions,number_of_sets_of_data,number_of_data_points,function_detail
=0,progress=0):
avg = 0
step = float(100)/number_of_functions
for i in range(0,number_of_functions):
if progress:
print step*float(i),"%"
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)+avg
avg = avg / number_of_functions
return avg

def average_error_over_functions
(self,number_of_sets_of_data,number_of_data_points,function_detail=0):
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)
return(avg)



def function_prediction(self,pts):
temp = self.point_prediction(pts)
return {'func':temp['mean'],'varpos':temp
['variance'],'varneg':-temp['variance']}


#########################################################################################################################################################
#Functions not contained in a class
#########################################################################################################################################################

#function to calculate the generalization error for a RandomWalk
kernel averaging over graphs graphs
def RandomWalkGeneralizationError
(noise,graphs,number_of_sets_of_data,number_of_data_points,a=2,p=10):
graph_specific = np.zeros(len(graphs))
avg = 0
for i in range(0,len(graphs)):
rw = CF.RandomWalk(a,p,graphs)
GP = GaussianProcessRegression(rw,noise)
graph_specific = GP.average_error_over_functions
(number_of_sets_of_data,number_of_data_points)
avg = np.sum(graph_specific)/len(graphs)
return avg, graph_specific

#as above but using queues to create parallel architechture
def RandomWalkGeneralizationErrorParallel
(work_queue,result_queue,a=2,p=10):
while True:
input = work_queue.get()
if input is None:
print "poison"
break
print 'this should not appear'
print input['datapt'], ' ', input['number_of_data_points']
rw=CF.RandomWalk(a,p,input['graph'])
GP = GaussianProcessRegression(rw,input['noise'])
err = GP.average_error_over_functions(input
['number_of_sets_of_data'],input['number_of_data_points'])
result_queue.put([input['datapt'],err])
print 'here'
return
 
M

ma3mju

Sorry

###############fortran
call#######################################

is meant to be

###############fortran call#######################################

Matt
 
P

Piet van Oostrum

ma3mju said:
m> Hi all,
m> I'm having trouble with multiprocessing I'm using it to speed up some
m> simulations, I find for large queues when the process reaches the
m> poison pill it does not exit whereas for smaller queues it works
m> without any problems. Has anyone else had this trouble? Can anyone
m> tell me a way around it? The code is in two files below.

How do you know it doesn't exit. You haven't shown any of your output.

I did discover a problem in your code, but it should cause an exception:
m> #set off some of the easy workers on the hard work (maybe double
m> number of hard)
m> for i in range(0,num_hard_workers):
m> hard_work_queue.put(None)
m> hard_workers.append(multiprocessing.Process
m> (target=GP.RandomWalkGeneralizationErrorParallel,args=
m> (hard_work_queue,result_queue,)))
m> #wait for all hard workers to finish
m> for worker in hard_workers:
m> worker.join()

Here you create new hard workers, but you never start them. The join
should then give an exception when it reaches these.
 
M

MRAB

ma3mju said:
Hi all,

I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.
[snip]

#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
easy_work_queue.put(None)
easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
What if the number of CPUs was 1? That would give 2 hard and 0 easy!

Also, I recommend that you put only 1 'poison pill' in each queue and
have the workers put it back when they see it.
 
M

ma3mju

How do you know it doesn't exit. You haven't shown any of your output.

I did discover a problem in your code, but it should cause an exception:


Here you create new hard workers, but you never start them. The join
should then give an exception when it reaches these.

Ok thanks I'll change that In a sec, It never reaches that bit of code
because the easy_workers don't exit so it never gets past the join().

As far as running it goes I get the datapt and number of points
printed to the list for everything in both queues. When it reaches the
end of either queue I get "Poison" on the screen then "here" for each
process but I don't get "worker joined" and as expected don't get
"this should not appear". If I have a look at the processes running
after all queues are supposed to have finished I see all of them
running taking little or no resources. This is running on Ubuntu
Jaunty at home and the same happens on the Debian machine at uni.

The weird thing is that if I run them with less points the processes
do manage to exit.

Thanks

Matt
 
M

ma3mju

ma3mju said:
I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.
[snip]

#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
    easy_work_queue.put(None)
    easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))

You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
What if the number of CPUs was 1? That would give 2 hard and 0 easy!

Also, I recommend that you put only 1 'poison pill' in each queue and
have the workers put it back when they see it.

I'll give that a go in a sec and see if it helps. The processes quit
out for smaller queues though so it should in theory be alright. I'm
not too fussed about the CPU's it's only there because I change
between a uni PC and home one with a different number of cores in each
but both greater than one.
 
M

ma3mju

ma3mju said:
Hi all,
I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.
#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
    easy_work_queue.put(None)
    easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
What if the number of CPUs was 1? That would give 2 hard and 0 easy!
Also, I recommend that you put only 1 'poison pill' in each queue and
have the workers put it back when they see it.

I'll give that a go in a sec and see if it helps. The processes quit
out for smaller queues though so it should in theory be alright. I'm
not too fussed about the CPU's it's only there because I change
between a uni PC and home one with a different number of cores in each
but both greater than one.

Just tried changing the poison pill part to no avail sadly
 
M

MRAB

ma3mju said:
ma3mju wrote:
Hi all,
I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.
[snip]
#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
easy_work_queue.put(None)
easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,args=
(hard_work_queue,result_queue,)))
You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
What if the number of CPUs was 1? That would give 2 hard and 0 easy!
Also, I recommend that you put only 1 'poison pill' in each queue and
have the workers put it back when they see it.
I'll give that a go in a sec and see if it helps. The processes quit
out for smaller queues though so it should in theory be alright. I'm
not too fussed about the CPU's it's only there because I change
between a uni PC and home one with a different number of cores in each
but both greater than one.

Just tried changing the poison pill part to no avail sadly

I wonder whether one of the workers is raising an exception, perhaps due
to lack of memory, when there are large number of jobs to process.

Another question: why are you distinguishing between easy and hard jobs?
Do you actually get a measurable improvement in performance from doing
it this way instead of having just a single queue of jobs and a single
pool of workers?
 
P

Piet van Oostrum

MRAB said:
M> I wonder whether one of the workers is raising an exception, perhaps due
M> to lack of memory, when there are large number of jobs to process.

But that wouldn't prevent the join. And you would probably get an
exception traceback printed.

I wonder if something fishy is happening in the multiprocessing
infrastructure. Or maybe the Fortran code goes wrong because it has no
protection against buffer overruns and similar problems, I think.
 
M

ma3mju

But that wouldn't prevent the join. And you would probably get an
exception traceback printed.

I wonder if something fishy is happening in the multiprocessing
infrastructure. Or maybe the Fortran code goes wrong because it has no
protection against buffer overruns and similar problems, I think.

I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.

The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.

I have a serial script that works fine so I know individually for each
example the fortran code works.

Thanks

Matt
 
M

ma3mju

I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.

The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.

I have a serial script that works fine so I know individually for each
example the fortran code works.

Thanks

Matt

Any ideas for a solution?
 
M

ma3mju

I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.

The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.

I have a serial script that works fine so I know individually for each
example the fortran code works.

Thanks

Matt

Any ideas?
 
M

MRAB

ma3mju said:
Any ideas for a solution?

A workaround is to do them in small batches.

You could put each job in a queue with a flag to say whether it's hard
or easy, then:

while have more jobs:
move up to BATCH_SIZE jobs into worker queues
create and start workers
wait for workers to finish
discard workers
 
M

ma3mju

ma3mju said:
M> I wonder whether one of the workers is raising an exception, perhaps due
M> to lack of memory, when there are large number of jobs to process..
But that wouldn't prevent the join. And you would probably get an
exception traceback printed.
I wonder if something fishy is happening in the multiprocessing
infrastructure. Or maybe the Fortran code goes wrong because it has no
protection against buffer overruns and similar problems, I think.
--
Piet van Oostrum <[email protected]>
URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
Private email: (e-mail address removed)
I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.
The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.
I have a serial script that works fine so I know individually for each
example the fortran code works.
Thanks
Matt
Any ideas for a solution?

A workaround is to do them in small batches.

You could put each job in a queue with a flag to say whether it's hard
or easy, then:

     while have more jobs:
         move up to BATCH_SIZE jobs into worker queues
         create and start workers
         wait for workers to finish
         discard workers

Yeah, I was hoping for something with a bit more finesse. In the end I
used pool instead with a callback function and that has solved the
problem. I did today find this snippet;

Joining processes that use queues

Bear in mind that a process that has put items in a queue will
wait before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the Queue.cancel_join_thread() method of the queue to avoid this
behaviour.)

This means that whenever you use a queue you need to make sure
that all items which have been put on the queue will eventually be
removed before the process is joined. Otherwise you cannot be sure
that processes which have put items on the queue will terminate.
Remember also that non-daemonic processes will be automatically be
joined.


I don't know (not a computer scientist) but could it have been the
pipe getting full?

In case anyway else is effected by this I've attached the new code to
see the changes I made to fix it.

Thanks for all your help

Matt

============================================================================================================================
parallel.py
============================================================================================================================
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
from multiprocessing import Pool

global result

def cb(r):
global result
print r
result[r[0]] = r[1]


############################################################################################
# Things You Can Change
############################################################################################
#savefiles
savefile = "powerlaw"
graphfile = "powerlawgraph"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]
num_hard_workers = 5
hard_work_threshold = 4000
############################################################################################
#generate graphs
graphs = []
for i in range(0,num_graphs):
graphs.append(nx.powerlaw_cluster_graph(500,0.1,0.05))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()

#queues
easy_work = []
hard_work = []

#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
for i in range(0,len(noise)):
for k in range(0,num_graphs):
if int(intervals[j]) <=hard_work_threshold:
easy_work.append({'datapt': l,'graph': graphs
[k],'noise': noise,'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
else:
hard_work.append({'datapt': l,'graph': graphs
[k],'noise': noise,'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(intervals[j])})
l+=1

result = np.zeros(l)

#create pool with all cores possible
worker_pool = Pool()

for i in xrange(0,len(easy_work)):
worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(easy_work,),callback=cb)

worker_pool.close()
worker_pool.join()

#create hard work queue
worker_pool = Pool(processes = num_hard_workers)

for i in xrange(0,len(hard_work)):
worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
(hard_work,),callback=cb)

worker_pool.close()
worker_pool.join()


finaldata = result.reshape((len(intervals),len(noise),num_graphs))
np.save(savefile,finaldata)

================================================================================================================================================
GaussianProcessRegression.py
================================================================================================================================================
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv

#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
def __init__(self,covariance_function,sigma):
#a covariance function object defined in CovarianceFunction
class
#note this uses the parent class but any children can be used
self.C = covariance_function
#a list of pts that are known and their values
self.pts = []
self.vals = []
#the inverse of K as defined in
#@book{coolen05:theoryofneural,
#ISBN = {0-19-853024-2},
#publisher = {Oxford University Press, USA},
#author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
#title = {Theory of neural information processing systems},
#year = {2005},
#}
self.K = np.array([])
#gaussian noise variable
self.sigma = float(sigma)
self.cholL = np.array([])


def add_data_points(self,points,vals):
#add all points to list
self.pts += points
self.vals += vals
arraysize = len(self.pts)
#construct K
K = np.zeros((arraysize,arraysize))
#for speed
pts = self.pts
between_points = self.C.between_points
if len(self.K):
K[:-1,:-1] = self.K
for i in xrange(0,arraysize):
for j in xrange(arraysize-len(points),arraysize):
K[i,j] = between_points(pts,pts[j])
K[j,i] = K[i,j]
K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
self.K = K

#calculate the prediction of a point based on data previously
given
def point_prediction(self,points):
mean = []
variance =[]
arraysize = len(self.pts)
#cholesky
#if self.cholL.shape[0] < arraysize:
L=np.linalg.cholesky(self.K)
# self.cholL = L
#else:
# L = self.cholL

alpha = sp.cho_solve((L,1),self.vals)
#create L in banded form
k=np.zeros((arraysize,len(points)))

##################################################################
#for speed get ref to functions im going to use and save them
between_points = self.C.between_points
pts = self.pts
dot = np.dot

##################################################################
for j in xrange(0,len(points)):
#create k
for i in xrange(0,arraysize):
k[i,j] = between_points(pts,points[j])

#calculate mean and variance
#call the command for forward substitution
###############fortran
call#######################################
v = dtrsv.dtrsv('L','N',arraysize,L,k)

##################################################################

#result
mean=dot(alpha,k)
for i in xrange(0,len(points)):
variance.append(between_points(points,points) - dot(v
[:,i],v[:,i]))
#return it in dictionary form
return {'mean':mean,'variance':variance}


# calculate the error for data given, where function is a vector
# of the function evaluated at a sufficiently large number of
points
# that the GPregression has been trying to learn
def error(self,function):
total = 0
#sum up variances
result = self.point_prediction(function[::2])
total = np.sum(result['variance'])
total = (1/float(len(function)/2))*total
return total

#clear what has been learnt so far
def clear(self):
self.pts = []
self.vals = []
self.K = np.array([])

#calculate the average error for a function defined in function
when give
#number_of_examples examples
def average_error_over_samples(self,function, sample_size,
number_of_examples):
avg = 0
numberofpoints = len(function)/2
for i in range(0,sample_size):
self.clear()
#generate points of the function
permpts = np.random.randint
(0,numberofpoints,number_of_examples)
#create the vectors
pts = []
vals = []
for j in range(0,number_of_examples):
pts.append(function[permpts[j]*2])
vals.append(function[permpts[j]*2+1])

#learn these points
self.add_data_points(pts,vals)
#print("points added")
avg = avg + self.error(function)
avg = avg/sample_size
return avg
 
M

MRAB

ma3mju said:
ma3mju said:
M> I wonder whether one of the workers is raising an exception, perhaps due
M> to lack of memory, when there are large number of jobs to process.
But that wouldn't prevent the join. And you would probably get an
exception traceback printed.
I wonder if something fishy is happening in the multiprocessing
infrastructure. Or maybe the Fortran code goes wrong because it has no
protection against buffer overruns and similar problems, I think.
--
Piet van Oostrum <[email protected]>
URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
Private email: (e-mail address removed)
I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.
The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.
I have a serial script that works fine so I know individually for each
example the fortran code works.
Thanks
Matt
Any ideas for a solution?
A workaround is to do them in small batches.

You could put each job in a queue with a flag to say whether it's hard
or easy, then:

while have more jobs:
move up to BATCH_SIZE jobs into worker queues
create and start workers
wait for workers to finish
discard workers

Yeah, I was hoping for something with a bit more finesse. In the end I
used pool instead with a callback function and that has solved the
problem. I did today find this snippet;

Joining processes that use queues

Bear in mind that a process that has put items in a queue will
wait before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe. (The child process can call
the Queue.cancel_join_thread() method of the queue to avoid this
behaviour.)

This means that whenever you use a queue you need to make sure
that all items which have been put on the queue will eventually be
removed before the process is joined. Otherwise you cannot be sure
that processes which have put items on the queue will terminate.
Remember also that non-daemonic processes will be automatically be
joined.


I don't know (not a computer scientist) but could it have been the
pipe getting full?

In case anyway else is effected by this I've attached the new code to
see the changes I made to fix it.
[snip]
Maybe the reason is this:

Threads share an address space, so putting data into a queue simply
involves putting a reference there, but processes don't share an address
space, so a sender must continue to exist until the data itself has been
copied into the pipe that connects the processes. This pipe has a
limited capacity.

In your code you were waiting for the easy workers to terminate and you
weren't reading from the queue, and maybe, therefore, the pipe either,
so with a large number of jobs the pipe was becoming full.

In summary: the worker didn't terminate because the pipe was full; the
pipe was full because you weren't reading the results; you weren't
reading the results because the worker hadn't terminated.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,744
Messages
2,569,483
Members
44,902
Latest member
Elena68X5

Latest Threads

Top