Processes not exiting

Discussion in 'Python' started by ma3mju, Jul 31, 2009.

  1. ma3mju

    ma3mju Guest

    Hi all,

    I'm having trouble with multiprocessing I'm using it to speed up some
    simulations, I find for large queues when the process reaches the
    poison pill it does not exit whereas for smaller queues it works
    without any problems. Has anyone else had this trouble? Can anyone
    tell me a way around it? The code is in two files below.

    Thanks

    Matt

    parallel.py
    ===================================================
    import GaussianProcessRegression as GP
    import numpy as np
    import networkx as nx
    import pickle
    import multiprocessing
    ############################################################################################
    # Things You Can Change
    ############################################################################################
    #savefiles
    savefile = "wattsdata2"
    graphfile = "wattsgraphs2"
    #sample sizes
    num_graphs = 5
    num_sets_of_data = 10
    #other things...
    intervals = np.ceil(np.logspace(-2,1,50)*500)
    noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]

    ############################################################################################
    #generate graphs
    graphs = []
    for i in range(0,num_graphs):
    graphs.append(nx.watts_strogatz_graph(500,5,0.01))
    #save them for later reference
    filehandler = open(graphfile,'w')
    pickle.dump(graphs,filehandler,-1)
    filehandler.close()

    #queues
    easy_work_queue = multiprocessing.Queue()
    hard_work_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    #construct the items in the hard queue
    l=0
    for j in range(0,len(intervals)):
    for i in range(0,len(noise)):
    for k in range(0,num_graphs):
    if int(intervals[j]) <=4000:
    easy_work_queue.put({'datapt': l,'graph': graphs
    [k],'noise': noise,'number_of_sets_of_data':
    num_sets_of_data,'number_of_data_points':int(intervals[j])})
    else:
    hard_work_queue.put({'datapt': l,'graph': graphs
    [k],'noise': noise,'number_of_sets_of_data':
    num_sets_of_data,'number_of_data_points':int(intervals[j])})
    l+=1

    #get number of cores and set the number on concurrent processes
    num_hard_workers = 2
    num_workers = multiprocessing.cpu_count()*1.5
    easy_workers = []
    hard_workers = []
    #add poison pill for each worker and create the worker
    for i in range(0,num_workers-num_hard_workers):
    easy_work_queue.put(None)
    easy_workers.append(multiprocessing.Process
    (target=GP.RandomWalkGeneralizationErrorParallel,args=
    (easy_work_queue,result_queue,)))
    for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
    (target=GP.RandomWalkGeneralizationErrorParallel,args=
    (hard_work_queue,result_queue,)))

    #run all workers
    for worker in hard_workers:
    worker.start()
    for worker in easy_workers:
    worker.start()
    #wait for easy workers to finish
    for worker in easy_workers:
    worker.join()
    print('worker joined')

    #set off some of the easy workers on the hard work (maybe double
    number of hard)
    for i in range(0,num_hard_workers):
    hard_work_queue.put(None)
    hard_workers.append(multiprocessing.Process
    (target=GP.RandomWalkGeneralizationErrorParallel,args=
    (hard_work_queue,result_queue,)))
    #wait for all hard workers to finish
    for worker in hard_workers:
    worker.join()

    #construct data from the mess in the result queue

    tempdata = np.zeros(l)
    while not result_queue.empty():
    data = result_queue.get()
    tempdata[data[0]] = data[1]

    finaldata = tempdata.reshape((len(intervals),len(noise),num_graphs))

    np.save(savefile,finaldata)

    =======================================================
    GaussianProcessRegression.py
    =======================================================
    import CovarianceFunction as CF
    import networkx as nx
    import numpy as np
    import scipy.linalg as sp
    #fortran code from lapack-blas (hopefully when scipy updated this wont
    be needed)
    import dtrsv
    #to use more than one core
    import multiprocessing

    #Currently we assume Gaussian noise TODO change to general noise
    #Assume 0 mean TODO change to general mean Gaussian Process
    class GaussianProcessRegression:
    def __init__(self,covariance_function,sigma):
    #a covariance function object defined in CovarianceFunction
    class
    #note this uses the parent class but any children can be used
    self.C = covariance_function
    #a list of pts that are known and their values
    self.pts = []
    self.vals = []
    #the inverse of K as defined in
    #@book{coolen05:theoryofneural,
    #ISBN = {0-19-853024-2},
    #publisher = {Oxford University Press, USA},
    #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
    #title = {Theory of neural information processing systems},
    #year = {2005},
    #}
    self.K = np.array([])
    #gaussian noise variable
    self.sigma = float(sigma)
    self.cholL = np.array([])


    def add_data_points(self,points,vals):
    #add all points to list
    self.pts += points
    self.vals += vals
    arraysize = len(self.pts)
    #construct K
    K = np.zeros((arraysize,arraysize))
    #for speed
    pts = self.pts
    between_points = self.C.between_points
    if len(self.K):
    K[:-1,:-1] = self.K
    for i in xrange(0,arraysize):
    for j in xrange(arraysize-len(points),arraysize):
    K[i,j] = between_points(pts,pts[j])
    K[j,i] = K[i,j]
    K[arraysize-len(points):arraysize,arraysize-len
    (points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
    (points):arraysize] + self.sigma**2 * np.eye(len(points))
    self.K = K

    #calculate the prediction of a point based on data previously
    given
    def point_prediction(self,points):
    mean = []
    variance =[]
    arraysize = len(self.pts)
    #cholesky
    #if self.cholL.shape[0] < arraysize:
    L=np.linalg.cholesky(self.K)
    # self.cholL = L
    #else:
    # L = self.cholL

    alpha = sp.cho_solve((L,1),self.vals)
    #create L in banded form
    k=np.zeros((arraysize,len(points)))

    ##################################################################
    #for speed get ref to functions im going to use and save them
    between_points = self.C.between_points
    pts = self.pts
    dot = np.dot

    ##################################################################
    for j in xrange(0,len(points)):
    #create k
    for i in xrange(0,arraysize):
    k[i,j] = between_points(pts,points[j])

    #calculate mean and variance
    #call the command for forward substitution
    ###############fortran
    call#######################################
    v = dtrsv.dtrsv('L','N',arraysize,L,k)

    ##################################################################

    #result
    mean=dot(alpha,k)
    for i in xrange(0,len(points)):
    variance.append(between_points(points,points) - dot(v
    [:,i],v[:,i]))
    #return it in dictionary form
    return {'mean':mean,'variance':variance}


    # calculate the error for data given, where function is a vector
    # of the function evaluated at a sufficiently large number of
    points
    # that the GPregression has been trying to learn
    def error(self,function):
    total = 0
    #sum up variances
    result = self.point_prediction(function[::2])
    total = np.sum(result['variance'])
    total = (1/float(len(function)/2))*total
    return total

    #clear what has been learnt so far
    def clear(self):
    self.pts = []
    self.vals = []
    self.K = np.array([])

    #calculate the average error for a function defined in function
    when give
    #number_of_examples examples
    def average_error_over_samples(self,function, sample_size,
    number_of_examples):
    avg = 0
    numberofpoints = len(function)/2
    for i in range(0,sample_size):
    self.clear()
    #generate points of the function
    permpts = np.random.randint
    (0,numberofpoints,number_of_examples)
    #create the vectors
    pts = []
    vals = []
    for j in range(0,number_of_examples):
    pts.append(function[permpts[j]*2])
    vals.append(function[permpts[j]*2+1])

    #learn these points
    self.add_data_points(pts,vals)
    #print("points added")
    avg = avg + self.error(function)
    avg = avg/sample_size
    return avg

    #calculate the average error over functions over data of size
    number_of_data_points for MOST cases this is
    #also the generalization error a summary of which and
    approximations to can be found in:
    #@inproceedings{Sollich99learningcurves,
    #booktitle = {Neural Computation},
    #author = {Sollich, P.},
    #title = {Learning curves for Gaussian process regression:
    Approximations and bounds},
    #pages = {200-2},
    #year = {1999},
    #}

    def emprical_average_error_over_functions
    (self,number_of_functions,number_of_sets_of_data,number_of_data_points,function_detail
    =0,progress=0):
    avg = 0
    step = float(100)/number_of_functions
    for i in range(0,number_of_functions):
    if progress:
    print step*float(i),"%"
    if function_detail:
    fx = self.C.generate_function
    (self.sigma,function_detail)
    else:
    fx = self.C.generate_function(self.sigma)
    avg = self.average_error_over_samples
    (fx,number_of_sets_of_data,number_of_data_points)+avg
    avg = avg / number_of_functions
    return avg

    def average_error_over_functions
    (self,number_of_sets_of_data,number_of_data_points,function_detail=0):
    if function_detail:
    fx = self.C.generate_function
    (self.sigma,function_detail)
    else:
    fx = self.C.generate_function(self.sigma)
    avg = self.average_error_over_samples
    (fx,number_of_sets_of_data,number_of_data_points)
    return(avg)



    def function_prediction(self,pts):
    temp = self.point_prediction(pts)
    return {'func':temp['mean'],'varpos':temp
    ['variance'],'varneg':-temp['variance']}


    #########################################################################################################################################################
    #Functions not contained in a class
    #########################################################################################################################################################

    #function to calculate the generalization error for a RandomWalk
    kernel averaging over graphs graphs
    def RandomWalkGeneralizationError
    (noise,graphs,number_of_sets_of_data,number_of_data_points,a=2,p=10):
    graph_specific = np.zeros(len(graphs))
    avg = 0
    for i in range(0,len(graphs)):
    rw = CF.RandomWalk(a,p,graphs)
    GP = GaussianProcessRegression(rw,noise)
    graph_specific = GP.average_error_over_functions
    (number_of_sets_of_data,number_of_data_points)
    avg = np.sum(graph_specific)/len(graphs)
    return avg, graph_specific

    #as above but using queues to create parallel architechture
    def RandomWalkGeneralizationErrorParallel
    (work_queue,result_queue,a=2,p=10):
    while True:
    input = work_queue.get()
    if input is None:
    print "poison"
    break
    print 'this should not appear'
    print input['datapt'], ' ', input['number_of_data_points']
    rw=CF.RandomWalk(a,p,input['graph'])
    GP = GaussianProcessRegression(rw,input['noise'])
    err = GP.average_error_over_functions(input
    ['number_of_sets_of_data'],input['number_of_data_points'])
    result_queue.put([input['datapt'],err])
    print 'here'
    return
     
    ma3mju, Jul 31, 2009
    #1
    1. Advertising

  2. ma3mju

    ma3mju Guest

    Sorry

    ###############fortran
    call#######################################

    is meant to be

    ###############fortran call#######################################

    Matt
     
    ma3mju, Jul 31, 2009
    #2
    1. Advertising

  3. >>>>> ma3mju <> (m) wrote:

    >m> Hi all,
    >m> I'm having trouble with multiprocessing I'm using it to speed up some
    >m> simulations, I find for large queues when the process reaches the
    >m> poison pill it does not exit whereas for smaller queues it works
    >m> without any problems. Has anyone else had this trouble? Can anyone
    >m> tell me a way around it? The code is in two files below.


    How do you know it doesn't exit. You haven't shown any of your output.

    I did discover a problem in your code, but it should cause an exception:

    >m> #set off some of the easy workers on the hard work (maybe double
    >m> number of hard)
    >m> for i in range(0,num_hard_workers):
    >m> hard_work_queue.put(None)
    >m> hard_workers.append(multiprocessing.Process
    >m> (target=GP.RandomWalkGeneralizationErrorParallel,args=
    >m> (hard_work_queue,result_queue,)))
    >m> #wait for all hard workers to finish
    >m> for worker in hard_workers:
    >m> worker.join()


    Here you create new hard workers, but you never start them. The join
    should then give an exception when it reaches these.
    --
    Piet van Oostrum <>
    URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
    Private email:
     
    Piet van Oostrum, Jul 31, 2009
    #3
  4. ma3mju

    MRAB Guest

    ma3mju wrote:
    > Hi all,
    >
    > I'm having trouble with multiprocessing I'm using it to speed up some
    > simulations, I find for large queues when the process reaches the
    > poison pill it does not exit whereas for smaller queues it works
    > without any problems. Has anyone else had this trouble? Can anyone
    > tell me a way around it? The code is in two files below.
    >

    [snip]
    >
    > #get number of cores and set the number on concurrent processes
    > num_hard_workers = 2
    > num_workers = multiprocessing.cpu_count()*1.5
    > easy_workers = []
    > hard_workers = []
    > #add poison pill for each worker and create the worker
    > for i in range(0,num_workers-num_hard_workers):
    > easy_work_queue.put(None)
    > easy_workers.append(multiprocessing.Process
    > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > (easy_work_queue,result_queue,)))
    > for i in range(0,num_hard_workers):
    > hard_work_queue.put(None)
    > hard_workers.append(multiprocessing.Process
    > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > (hard_work_queue,result_queue,)))
    >

    You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
    What if the number of CPUs was 1? That would give 2 hard and 0 easy!

    Also, I recommend that you put only 1 'poison pill' in each queue and
    have the workers put it back when they see it.
     
    MRAB, Jul 31, 2009
    #4
  5. ma3mju

    ma3mju Guest

    On 31 July, 11:27, Piet van Oostrum <> wrote:
    > >>>>> ma3mju <> (m) wrote:

    > >m> Hi all,
    > >m> I'm having trouble with multiprocessing I'm using it to speed up some
    > >m> simulations, I find for large queues when the process reaches the
    > >m> poison pill it does not exit whereas for smaller queues it works
    > >m> without any problems. Has anyone else had this trouble? Can anyone
    > >m> tell me a way around it? The code is in two files below.

    >
    > How do you know it doesn't exit. You haven't shown any of your output.
    >
    > I did discover a problem in your code, but it should cause an exception:
    >
    > >m> #set off some of the easy workers on the hard work (maybe double
    > >m> number of hard)
    > >m> for i in range(0,num_hard_workers):
    > >m>     hard_work_queue.put(None)
    > >m>     hard_workers.append(multiprocessing.Process
    > >m> (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > >m> (hard_work_queue,result_queue,)))
    > >m> #wait for all hard workers to finish
    > >m> for worker in hard_workers:
    > >m>     worker.join()

    >
    > Here you create new hard workers, but you never start them. The join
    > should then give an exception when it reaches these.
    > --
    > Piet van Oostrum <>
    > URL:http://pietvanoostrum.com[PGP 8DAE142BE17999C4]
    > Private email:


    Ok thanks I'll change that In a sec, It never reaches that bit of code
    because the easy_workers don't exit so it never gets past the join().

    As far as running it goes I get the datapt and number of points
    printed to the list for everything in both queues. When it reaches the
    end of either queue I get "Poison" on the screen then "here" for each
    process but I don't get "worker joined" and as expected don't get
    "this should not appear". If I have a look at the processes running
    after all queues are supposed to have finished I see all of them
    running taking little or no resources. This is running on Ubuntu
    Jaunty at home and the same happens on the Debian machine at uni.

    The weird thing is that if I run them with less points the processes
    do manage to exit.

    Thanks

    Matt
     
    ma3mju, Aug 2, 2009
    #5
  6. ma3mju

    ma3mju Guest

    On 31 July, 11:34, MRAB <> wrote:
    > ma3mju wrote:
    > > Hi all,

    >
    > > I'm having trouble with multiprocessing I'm using it to speed up some
    > > simulations, I find for large queues when the process reaches the
    > > poison pill it does not exit whereas for smaller queues it works
    > > without any problems. Has anyone else had this trouble? Can anyone
    > > tell me a way around it? The code is in two files below.

    >
    > [snip]
    >
    > > #get number of cores and set the number on concurrent processes
    > > num_hard_workers = 2
    > > num_workers = multiprocessing.cpu_count()*1.5
    > > easy_workers = []
    > > hard_workers = []
    > > #add poison pill for each worker and create the worker
    > > for i in range(0,num_workers-num_hard_workers):
    > >     easy_work_queue.put(None)
    > >     easy_workers.append(multiprocessing.Process
    > > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > > (easy_work_queue,result_queue,)))
    > > for i in range(0,num_hard_workers):
    > >     hard_work_queue.put(None)
    > >     hard_workers.append(multiprocessing.Process
    > > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > > (hard_work_queue,result_queue,)))

    >
    > You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
    > What if the number of CPUs was 1? That would give 2 hard and 0 easy!
    >
    > Also, I recommend that you put only 1 'poison pill' in each queue and
    > have the workers put it back when they see it.


    I'll give that a go in a sec and see if it helps. The processes quit
    out for smaller queues though so it should in theory be alright. I'm
    not too fussed about the CPU's it's only there because I change
    between a uni PC and home one with a different number of cores in each
    but both greater than one.
     
    ma3mju, Aug 2, 2009
    #6
  7. ma3mju

    ma3mju Guest

    On 2 Aug, 15:48, ma3mju <> wrote:
    > On 31 July, 11:34, MRAB <> wrote:
    >
    >
    >
    > > ma3mju wrote:
    > > > Hi all,

    >
    > > > I'm having trouble with multiprocessing I'm using it to speed up some
    > > > simulations, I find for large queues when the process reaches the
    > > > poison pill it does not exit whereas for smaller queues it works
    > > > without any problems. Has anyone else had this trouble? Can anyone
    > > > tell me a way around it? The code is in two files below.

    >
    > > [snip]

    >
    > > > #get number of cores and set the number on concurrent processes
    > > > num_hard_workers = 2
    > > > num_workers = multiprocessing.cpu_count()*1.5
    > > > easy_workers = []
    > > > hard_workers = []
    > > > #add poison pill for each worker and create the worker
    > > > for i in range(0,num_workers-num_hard_workers):
    > > >     easy_work_queue.put(None)
    > > >     easy_workers.append(multiprocessing.Process
    > > > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > > > (easy_work_queue,result_queue,)))
    > > > for i in range(0,num_hard_workers):
    > > >     hard_work_queue.put(None)
    > > >     hard_workers.append(multiprocessing.Process
    > > > (target=GP.RandomWalkGeneralizationErrorParallel,args=
    > > > (hard_work_queue,result_queue,)))

    >
    > > You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
    > > What if the number of CPUs was 1? That would give 2 hard and 0 easy!

    >
    > > Also, I recommend that you put only 1 'poison pill' in each queue and
    > > have the workers put it back when they see it.

    >
    > I'll give that a go in a sec and see if it helps. The processes quit
    > out for smaller queues though so it should in theory be alright. I'm
    > not too fussed about the CPU's it's only there because I change
    > between a uni PC and home one with a different number of cores in each
    > but both greater than one.


    Just tried changing the poison pill part to no avail sadly
     
    ma3mju, Aug 2, 2009
    #7
  8. ma3mju

    MRAB Guest

    ma3mju wrote:
    > On 2 Aug, 15:48, ma3mju <> wrote:
    >> On 31 July, 11:34, MRAB <> wrote:
    >>
    >>
    >>
    >>> ma3mju wrote:
    >>>> Hi all,
    >>>> I'm having trouble with multiprocessing I'm using it to speed up some
    >>>> simulations, I find for large queues when the process reaches the
    >>>> poison pill it does not exit whereas for smaller queues it works
    >>>> without any problems. Has anyone else had this trouble? Can anyone
    >>>> tell me a way around it? The code is in two files below.
    >>> [snip]
    >>>> #get number of cores and set the number on concurrent processes
    >>>> num_hard_workers = 2
    >>>> num_workers = multiprocessing.cpu_count()*1.5
    >>>> easy_workers = []
    >>>> hard_workers = []
    >>>> #add poison pill for each worker and create the worker
    >>>> for i in range(0,num_workers-num_hard_workers):
    >>>> easy_work_queue.put(None)
    >>>> easy_workers.append(multiprocessing.Process
    >>>> (target=GP.RandomWalkGeneralizationErrorParallel,args=
    >>>> (easy_work_queue,result_queue,)))
    >>>> for i in range(0,num_hard_workers):
    >>>> hard_work_queue.put(None)
    >>>> hard_workers.append(multiprocessing.Process
    >>>> (target=GP.RandomWalkGeneralizationErrorParallel,args=
    >>>> (hard_work_queue,result_queue,)))
    >>> You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
    >>> What if the number of CPUs was 1? That would give 2 hard and 0 easy!
    >>> Also, I recommend that you put only 1 'poison pill' in each queue and
    >>> have the workers put it back when they see it.

    >> I'll give that a go in a sec and see if it helps. The processes quit
    >> out for smaller queues though so it should in theory be alright. I'm
    >> not too fussed about the CPU's it's only there because I change
    >> between a uni PC and home one with a different number of cores in each
    >> but both greater than one.

    >
    > Just tried changing the poison pill part to no avail sadly


    I wonder whether one of the workers is raising an exception, perhaps due
    to lack of memory, when there are large number of jobs to process.

    Another question: why are you distinguishing between easy and hard jobs?
    Do you actually get a measurable improvement in performance from doing
    it this way instead of having just a single queue of jobs and a single
    pool of workers?
     
    MRAB, Aug 2, 2009
    #8
  9. >>>>> MRAB <> (M) wrote:

    >M> I wonder whether one of the workers is raising an exception, perhaps due
    >M> to lack of memory, when there are large number of jobs to process.


    But that wouldn't prevent the join. And you would probably get an
    exception traceback printed.

    I wonder if something fishy is happening in the multiprocessing
    infrastructure. Or maybe the Fortran code goes wrong because it has no
    protection against buffer overruns and similar problems, I think.
    --
    Piet van Oostrum <>
    URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
    Private email:
     
    Piet van Oostrum, Aug 2, 2009
    #9
  10. ma3mju

    ma3mju Guest

    On 2 Aug, 21:49, Piet van Oostrum <> wrote:
    > >>>>> MRAB <> (M) wrote:

    > >M> I wonder whether one of the workers is raising an exception, perhaps due
    > >M> to lack of memory, when there are large number of jobs to process.

    >
    > But that wouldn't prevent the join. And you would probably get an
    > exception traceback printed.
    >
    > I wonder if something fishy is happening in the multiprocessing
    > infrastructure. Or maybe the Fortran code goes wrong because it has no
    > protection against buffer overruns and similar problems, I think.
    > --
    > Piet van Oostrum <>
    > URL:http://pietvanoostrum.com[PGP 8DAE142BE17999C4]
    > Private email:


    I don't think it's a memory problem, the reason for the hard and easy
    queue is because for larger examples it uses far more RAM. If I run
    all of workers with harder problems I do begin to run out of RAM and
    end up spending all my time switching in and out of swap so I limit
    the number of harder problems I run at the same time. I've watched it
    run to the end (a very boring couple of hours) and it stays out of my
    swap space and everything appears to be staying in RAM. Just hangs
    after all "poison" has been printed for each process.

    The other thing is that I get the message "here" telling me I broke
    out of the loop after seeing the poison pill in the process and I get
    all the things queued listed as output surely if I were to run out of
    memory I wouldn't expect all of the jobs to be listed as output.

    I have a serial script that works fine so I know individually for each
    example the fortran code works.

    Thanks

    Matt
     
    ma3mju, Aug 3, 2009
    #10
  11. ma3mju

    ma3mju Guest

    On 3 Aug, 09:36, ma3mju <> wrote:
    > On 2 Aug, 21:49, Piet van Oostrum <> wrote:
    >
    > > >>>>> MRAB <> (M) wrote:
    > > >M> I wonder whether one of the workers is raising an exception, perhaps due
    > > >M> to lack of memory, when there are large number of jobs to process.

    >
    > > But that wouldn't prevent the join. And you would probably get an
    > > exception traceback printed.

    >
    > > I wonder if something fishy is happening in the multiprocessing
    > > infrastructure. Or maybe the Fortran code goes wrong because it has no
    > > protection against buffer overruns and similar problems, I think.
    > > --
    > > Piet van Oostrum <>
    > > URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
    > > Private email:

    >
    > I don't think it's a memory problem, the reason for the hard and easy
    > queue is because for larger examples it uses far more RAM. If I run
    > all of workers with harder problems I do begin to run out of RAM and
    > end up spending all my time switching in and out of swap so I limit
    > the number of harder problems I run at the same time. I've watched it
    > run to the end (a very boring couple of hours) and it stays out of my
    > swap space and everything appears to be staying in RAM. Just hangs
    > after all "poison" has been printed for each process.
    >
    > The other thing is that I get the message "here" telling me I broke
    > out of the loop after seeing the poison pill in the process and I get
    > all the things queued listed as output surely if I were to run out of
    > memory I wouldn't expect all of the jobs to be listed as output.
    >
    > I have a serial script that works fine so I know individually for each
    > example the fortran code works.
    >
    > Thanks
    >
    > Matt


    Any ideas for a solution?
     
    ma3mju, Aug 7, 2009
    #11
  12. ma3mju

    ma3mju Guest

    On 3 Aug, 09:36, ma3mju <> wrote:
    > On 2 Aug, 21:49, Piet van Oostrum <> wrote:
    >
    > > >>>>> MRAB <> (M) wrote:
    > > >M> I wonder whether one of the workers is raising an exception, perhaps due
    > > >M> to lack of memory, when there are large number of jobs to process.

    >
    > > But that wouldn't prevent the join. And you would probably get an
    > > exception traceback printed.

    >
    > > I wonder if something fishy is happening in the multiprocessing
    > > infrastructure. Or maybe the Fortran code goes wrong because it has no
    > > protection against buffer overruns and similar problems, I think.
    > > --
    > > Piet van Oostrum <>
    > > URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
    > > Private email:

    >
    > I don't think it's a memory problem, the reason for the hard and easy
    > queue is because for larger examples it uses far more RAM. If I run
    > all of workers with harder problems I do begin to run out of RAM and
    > end up spending all my time switching in and out of swap so I limit
    > the number of harder problems I run at the same time. I've watched it
    > run to the end (a very boring couple of hours) and it stays out of my
    > swap space and everything appears to be staying in RAM. Just hangs
    > after all "poison" has been printed for each process.
    >
    > The other thing is that I get the message "here" telling me I broke
    > out of the loop after seeing the poison pill in the process and I get
    > all the things queued listed as output surely if I were to run out of
    > memory I wouldn't expect all of the jobs to be listed as output.
    >
    > I have a serial script that works fine so I know individually for each
    > example the fortran code works.
    >
    > Thanks
    >
    > Matt


    Any ideas?
     
    ma3mju, Aug 7, 2009
    #12
  13. ma3mju

    MRAB Guest

    ma3mju wrote:
    > On 3 Aug, 09:36, ma3mju <> wrote:
    >> On 2 Aug, 21:49, Piet van Oostrum <> wrote:
    >>
    >>>>>>>> MRAB <> (M) wrote:
    >>>> M> I wonder whether one of the workers is raising an exception, perhaps due
    >>>> M> to lack of memory, when there are large number of jobs to process.
    >>> But that wouldn't prevent the join. And you would probably get an
    >>> exception traceback printed.
    >>> I wonder if something fishy is happening in the multiprocessing
    >>> infrastructure. Or maybe the Fortran code goes wrong because it has no
    >>> protection against buffer overruns and similar problems, I think.
    >>> --
    >>> Piet van Oostrum <>
    >>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
    >>> Private email:

    >> I don't think it's a memory problem, the reason for the hard and easy
    >> queue is because for larger examples it uses far more RAM. If I run
    >> all of workers with harder problems I do begin to run out of RAM and
    >> end up spending all my time switching in and out of swap so I limit
    >> the number of harder problems I run at the same time. I've watched it
    >> run to the end (a very boring couple of hours) and it stays out of my
    >> swap space and everything appears to be staying in RAM. Just hangs
    >> after all "poison" has been printed for each process.
    >>
    >> The other thing is that I get the message "here" telling me I broke
    >> out of the loop after seeing the poison pill in the process and I get
    >> all the things queued listed as output surely if I were to run out of
    >> memory I wouldn't expect all of the jobs to be listed as output.
    >>
    >> I have a serial script that works fine so I know individually for each
    >> example the fortran code works.
    >>
    >> Thanks
    >>
    >> Matt

    >
    > Any ideas for a solution?


    A workaround is to do them in small batches.

    You could put each job in a queue with a flag to say whether it's hard
    or easy, then:

    while have more jobs:
    move up to BATCH_SIZE jobs into worker queues
    create and start workers
    wait for workers to finish
    discard workers
     
    MRAB, Aug 7, 2009
    #13
  14. ma3mju

    ma3mju Guest

    On 7 Aug, 16:02, MRAB <> wrote:
    > ma3mju wrote:
    > > On 3 Aug, 09:36, ma3mju <> wrote:
    > >> On 2 Aug, 21:49, Piet van Oostrum <> wrote:

    >
    > >>>>>>>> MRAB <> (M) wrote:
    > >>>> M> I wonder whether one of the workers is raising an exception, perhaps due
    > >>>> M> to lack of memory, when there are large number of jobs to process..
    > >>> But that wouldn't prevent the join. And you would probably get an
    > >>> exception traceback printed.
    > >>> I wonder if something fishy is happening in the multiprocessing
    > >>> infrastructure. Or maybe the Fortran code goes wrong because it has no
    > >>> protection against buffer overruns and similar problems, I think.
    > >>> --
    > >>> Piet van Oostrum <>
    > >>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
    > >>> Private email:
    > >> I don't think it's a memory problem, the reason for the hard and easy
    > >> queue is because for larger examples it uses far more RAM. If I run
    > >> all of workers with harder problems I do begin to run out of RAM and
    > >> end up spending all my time switching in and out of swap so I limit
    > >> the number of harder problems I run at the same time. I've watched it
    > >> run to the end (a very boring couple of hours) and it stays out of my
    > >> swap space and everything appears to be staying in RAM. Just hangs
    > >> after all "poison" has been printed for each process.

    >
    > >> The other thing is that I get the message "here" telling me I broke
    > >> out of the loop after seeing the poison pill in the process and I get
    > >> all the things queued listed as output surely if I were to run out of
    > >> memory I wouldn't expect all of the jobs to be listed as output.

    >
    > >> I have a serial script that works fine so I know individually for each
    > >> example the fortran code works.

    >
    > >> Thanks

    >
    > >> Matt

    >
    > > Any ideas for a solution?

    >
    > A workaround is to do them in small batches.
    >
    > You could put each job in a queue with a flag to say whether it's hard
    > or easy, then:
    >
    >      while have more jobs:
    >          move up to BATCH_SIZE jobs into worker queues
    >          create and start workers
    >          wait for workers to finish
    >          discard workers


    Yeah, I was hoping for something with a bit more finesse. In the end I
    used pool instead with a callback function and that has solved the
    problem. I did today find this snippet;

    Joining processes that use queues

    Bear in mind that a process that has put items in a queue will
    wait before terminating until all the buffered items are fed by the
    “feeder” thread to the underlying pipe. (The child process can call
    the Queue.cancel_join_thread() method of the queue to avoid this
    behaviour.)

    This means that whenever you use a queue you need to make sure
    that all items which have been put on the queue will eventually be
    removed before the process is joined. Otherwise you cannot be sure
    that processes which have put items on the queue will terminate.
    Remember also that non-daemonic processes will be automatically be
    joined.


    I don't know (not a computer scientist) but could it have been the
    pipe getting full?

    In case anyway else is effected by this I've attached the new code to
    see the changes I made to fix it.

    Thanks for all your help

    Matt

    ============================================================================================================================
    parallel.py
    ============================================================================================================================
    import GaussianProcessRegression as GP
    import numpy as np
    import networkx as nx
    import pickle
    from multiprocessing import Pool

    global result

    def cb(r):
    global result
    print r
    result[r[0]] = r[1]


    ############################################################################################
    # Things You Can Change
    ############################################################################################
    #savefiles
    savefile = "powerlaw"
    graphfile = "powerlawgraph"
    #sample sizes
    num_graphs = 5
    num_sets_of_data = 10
    #other things...
    intervals = np.ceil(np.logspace(-2,1,50)*500)
    noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)]
    num_hard_workers = 5
    hard_work_threshold = 4000
    ############################################################################################
    #generate graphs
    graphs = []
    for i in range(0,num_graphs):
    graphs.append(nx.powerlaw_cluster_graph(500,0.1,0.05))
    #save them for later reference
    filehandler = open(graphfile,'w')
    pickle.dump(graphs,filehandler,-1)
    filehandler.close()

    #queues
    easy_work = []
    hard_work = []

    #construct the items in the hard queue
    l=0
    for j in range(0,len(intervals)):
    for i in range(0,len(noise)):
    for k in range(0,num_graphs):
    if int(intervals[j]) <=hard_work_threshold:
    easy_work.append({'datapt': l,'graph': graphs
    [k],'noise': noise,'number_of_sets_of_data':
    num_sets_of_data,'number_of_data_points':int(intervals[j])})
    else:
    hard_work.append({'datapt': l,'graph': graphs
    [k],'noise': noise,'number_of_sets_of_data':
    num_sets_of_data,'number_of_data_points':int(intervals[j])})
    l+=1

    result = np.zeros(l)

    #create pool with all cores possible
    worker_pool = Pool()

    for i in xrange(0,len(easy_work)):
    worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
    (easy_work,),callback=cb)

    worker_pool.close()
    worker_pool.join()

    #create hard work queue
    worker_pool = Pool(processes = num_hard_workers)

    for i in xrange(0,len(hard_work)):
    worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel,
    (hard_work,),callback=cb)

    worker_pool.close()
    worker_pool.join()


    finaldata = result.reshape((len(intervals),len(noise),num_graphs))
    np.save(savefile,finaldata)

    ================================================================================================================================================
    GaussianProcessRegression.py
    ================================================================================================================================================
    import CovarianceFunction as CF
    import networkx as nx
    import numpy as np
    import scipy.linalg as sp
    #fortran code from lapack-blas (hopefully when scipy updated this wont
    be needed)
    import dtrsv

    #Currently we assume Gaussian noise TODO change to general noise
    #Assume 0 mean TODO change to general mean Gaussian Process
    class GaussianProcessRegression:
    def __init__(self,covariance_function,sigma):
    #a covariance function object defined in CovarianceFunction
    class
    #note this uses the parent class but any children can be used
    self.C = covariance_function
    #a list of pts that are known and their values
    self.pts = []
    self.vals = []
    #the inverse of K as defined in
    #@book{coolen05:theoryofneural,
    #ISBN = {0-19-853024-2},
    #publisher = {Oxford University Press, USA},
    #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
    #title = {Theory of neural information processing systems},
    #year = {2005},
    #}
    self.K = np.array([])
    #gaussian noise variable
    self.sigma = float(sigma)
    self.cholL = np.array([])


    def add_data_points(self,points,vals):
    #add all points to list
    self.pts += points
    self.vals += vals
    arraysize = len(self.pts)
    #construct K
    K = np.zeros((arraysize,arraysize))
    #for speed
    pts = self.pts
    between_points = self.C.between_points
    if len(self.K):
    K[:-1,:-1] = self.K
    for i in xrange(0,arraysize):
    for j in xrange(arraysize-len(points),arraysize):
    K[i,j] = between_points(pts,pts[j])
    K[j,i] = K[i,j]
    K[arraysize-len(points):arraysize,arraysize-len
    (points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
    (points):arraysize] + self.sigma**2 * np.eye(len(points))
    self.K = K

    #calculate the prediction of a point based on data previously
    given
    def point_prediction(self,points):
    mean = []
    variance =[]
    arraysize = len(self.pts)
    #cholesky
    #if self.cholL.shape[0] < arraysize:
    L=np.linalg.cholesky(self.K)
    # self.cholL = L
    #else:
    # L = self.cholL

    alpha = sp.cho_solve((L,1),self.vals)
    #create L in banded form
    k=np.zeros((arraysize,len(points)))

    ##################################################################
    #for speed get ref to functions im going to use and save them
    between_points = self.C.between_points
    pts = self.pts
    dot = np.dot

    ##################################################################
    for j in xrange(0,len(points)):
    #create k
    for i in xrange(0,arraysize):
    k[i,j] = between_points(pts,points[j])

    #calculate mean and variance
    #call the command for forward substitution
    ###############fortran
    call#######################################
    v = dtrsv.dtrsv('L','N',arraysize,L,k)

    ##################################################################

    #result
    mean=dot(alpha,k)
    for i in xrange(0,len(points)):
    variance.append(between_points(points,points) - dot(v
    [:,i],v[:,i]))
    #return it in dictionary form
    return {'mean':mean,'variance':variance}


    # calculate the error for data given, where function is a vector
    # of the function evaluated at a sufficiently large number of
    points
    # that the GPregression has been trying to learn
    def error(self,function):
    total = 0
    #sum up variances
    result = self.point_prediction(function[::2])
    total = np.sum(result['variance'])
    total = (1/float(len(function)/2))*total
    return total

    #clear what has been learnt so far
    def clear(self):
    self.pts = []
    self.vals = []
    self.K = np.array([])

    #calculate the average error for a function defined in function
    when give
    #number_of_examples examples
    def average_error_over_samples(self,function, sample_size,
    number_of_examples):
    avg = 0
    numberofpoints = len(function)/2
    for i in range(0,sample_size):
    self.clear()
    #generate points of the function
    permpts = np.random.randint
    (0,numberofpoints,number_of_examples)
    #create the vectors
    pts = []
    vals = []
    for j in range(0,number_of_examples):
    pts.append(function[permpts[j]*2])
    vals.append(function[permpts[j]*2+1])

    #learn these points
    self.add_data_points(pts,vals)
    #print("points added")
    avg = avg + self.error(function)
    avg = avg/sample_size
    return avg
     
    ma3mju, Aug 10, 2009
    #14
  15. ma3mju

    MRAB Guest

    ma3mju wrote:
    > On 7 Aug, 16:02, MRAB <> wrote:
    >> ma3mju wrote:
    >>> On 3 Aug, 09:36, ma3mju <> wrote:
    >>>> On 2 Aug, 21:49, Piet van Oostrum <> wrote:
    >>>>>>>>>> MRAB <> (M) wrote:
    >>>>>> M> I wonder whether one of the workers is raising an exception, perhaps due
    >>>>>> M> to lack of memory, when there are large number of jobs to process.
    >>>>> But that wouldn't prevent the join. And you would probably get an
    >>>>> exception traceback printed.
    >>>>> I wonder if something fishy is happening in the multiprocessing
    >>>>> infrastructure. Or maybe the Fortran code goes wrong because it has no
    >>>>> protection against buffer overruns and similar problems, I think.
    >>>>> --
    >>>>> Piet van Oostrum <>
    >>>>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4]
    >>>>> Private email:
    >>>> I don't think it's a memory problem, the reason for the hard and easy
    >>>> queue is because for larger examples it uses far more RAM. If I run
    >>>> all of workers with harder problems I do begin to run out of RAM and
    >>>> end up spending all my time switching in and out of swap so I limit
    >>>> the number of harder problems I run at the same time. I've watched it
    >>>> run to the end (a very boring couple of hours) and it stays out of my
    >>>> swap space and everything appears to be staying in RAM. Just hangs
    >>>> after all "poison" has been printed for each process.
    >>>> The other thing is that I get the message "here" telling me I broke
    >>>> out of the loop after seeing the poison pill in the process and I get
    >>>> all the things queued listed as output surely if I were to run out of
    >>>> memory I wouldn't expect all of the jobs to be listed as output.
    >>>> I have a serial script that works fine so I know individually for each
    >>>> example the fortran code works.
    >>>> Thanks
    >>>> Matt
    >>> Any ideas for a solution?

    >> A workaround is to do them in small batches.
    >>
    >> You could put each job in a queue with a flag to say whether it's hard
    >> or easy, then:
    >>
    >> while have more jobs:
    >> move up to BATCH_SIZE jobs into worker queues
    >> create and start workers
    >> wait for workers to finish
    >> discard workers

    >
    > Yeah, I was hoping for something with a bit more finesse. In the end I
    > used pool instead with a callback function and that has solved the
    > problem. I did today find this snippet;
    >
    > Joining processes that use queues
    >
    > Bear in mind that a process that has put items in a queue will
    > wait before terminating until all the buffered items are fed by the
    > “feeder” thread to the underlying pipe. (The child process can call
    > the Queue.cancel_join_thread() method of the queue to avoid this
    > behaviour.)
    >
    > This means that whenever you use a queue you need to make sure
    > that all items which have been put on the queue will eventually be
    > removed before the process is joined. Otherwise you cannot be sure
    > that processes which have put items on the queue will terminate.
    > Remember also that non-daemonic processes will be automatically be
    > joined.
    >
    >
    > I don't know (not a computer scientist) but could it have been the
    > pipe getting full?
    >
    > In case anyway else is effected by this I've attached the new code to
    > see the changes I made to fix it.
    >

    [snip]
    Maybe the reason is this:

    Threads share an address space, so putting data into a queue simply
    involves putting a reference there, but processes don't share an address
    space, so a sender must continue to exist until the data itself has been
    copied into the pipe that connects the processes. This pipe has a
    limited capacity.

    In your code you were waiting for the easy workers to terminate and you
    weren't reading from the queue, and maybe, therefore, the pipe either,
    so with a large number of jobs the pipe was becoming full.

    In summary: the worker didn't terminate because the pipe was full; the
    pipe was full because you weren't reading the results; you weren't
    reading the results because the worker hadn't terminated.
     
    MRAB, Aug 10, 2009
    #15
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Jeff Rodriguez
    Replies:
    23
    Views:
    1,183
    David Schwartz
    Dec 9, 2003
  2. Steven J. Sobol

    Firefox not exiting (JApplet)

    Steven J. Sobol, Sep 5, 2006, in forum: Java
    Replies:
    4
    Views:
    473
    Steven J. Sobol
    Sep 6, 2006
  3. sLim

    loop not exiting

    sLim, Nov 28, 2009, in forum: C Programming
    Replies:
    1
    Views:
    363
  4. Derek Lewis
    Replies:
    2
    Views:
    130
    Zachary P. Landau
    Dec 4, 2003
  5. Marc Heiler
    Replies:
    1
    Views:
    187
    Robert Klemme
    May 24, 2009
Loading...

Share This Page