Multiprocessing.Queue deadlock

Discussion in 'Python' started by Felix, Oct 7, 2009.

  1. Felix

    Felix Guest

    Hello,

    I keep running into a deadlock in a fairly simple parallel script
    using Multiprocessing.Queue for sending tasks and receiving results.
    From the documentation I cannot figure out what is happening and none
    of the examples seem to cover quite what I am doing. The main code is

    results = mp.Queue()
    tasks = mp.JoinableQueue()
    tasks.put( (0,0) )
    procs = [ mp.Process(target=work, args=(tasks, results)) for i in range
    (nprocs)]
    for p in procs:
    p.daemon = True
    p.start()

    tasks.join()
    for i in range(nprocs): tasks.put('STOP')
    for p in procs: p.join()
    res=[]
    while 1:
    try:
    res.append(res.get(False))
    except Empty: break


    The function 'work' both consumes tasks adding the results to the
    output queue and adds new tasks to the input queue based on its
    result.

    def work(tasks, results):
    for task in iter(tasks.get, 'STOP'):
    res = calc(*task)
    if res:
    results.put(res)
    tasks.put((task[0], res[1]))
    tasks.put((res[0],task[1]))
    queue.task_done()

    This program will hang while the main process joins the workers (after
    all results are computed, i.e. after tasks.join() ). The workers have
    finished function 'work', but have not terminated yet.

    Calling results.cancel_join_thread as a last line in 'work' prevents
    the deadlocks, as does terminating the workers directly. However I am
    not sure why that would be needed and if it might not make me loose
    results.

    It seems to be the workers cannot finish pusing buffered results into
    the output queue when calling 'results.join_thread' while terminating,
    but why is that? I tried calling 'results.close()' before joining the
    workers in the main process, but it does not make a difference.

    Is there something I am understanding wrong about the interface? Is
    there a much better way to do what I am trying to do above?

    Thanks
    Felix
    Felix, Oct 7, 2009
    #1
    1. Advertising

  2. Felix

    MRAB Guest

    Felix wrote:
    > Hello,
    >
    > I keep running into a deadlock in a fairly simple parallel script
    > using Multiprocessing.Queue for sending tasks and receiving results.
    >>From the documentation I cannot figure out what is happening and none

    > of the examples seem to cover quite what I am doing. The main code is
    >
    > results = mp.Queue()
    > tasks = mp.JoinableQueue()
    > tasks.put( (0,0) )
    > procs = [ mp.Process(target=work, args=(tasks, results)) for i in range
    > (nprocs)]
    > for p in procs:
    > p.daemon = True
    > p.start()
    >
    > tasks.join()
    > for i in range(nprocs): tasks.put('STOP')
    > for p in procs: p.join()
    > res=[]
    > while 1:
    > try:
    > res.append(res.get(False))
    > except Empty: break
    >
    >
    > The function 'work' both consumes tasks adding the results to the
    > output queue and adds new tasks to the input queue based on its
    > result.
    >
    > def work(tasks, results):
    > for task in iter(tasks.get, 'STOP'):
    > res = calc(*task)
    > if res:
    > results.put(res)
    > tasks.put((task[0], res[1]))
    > tasks.put((res[0],task[1]))
    > queue.task_done()
    >
    > This program will hang while the main process joins the workers (after
    > all results are computed, i.e. after tasks.join() ). The workers have
    > finished function 'work', but have not terminated yet.
    >
    > Calling results.cancel_join_thread as a last line in 'work' prevents
    > the deadlocks, as does terminating the workers directly. However I am
    > not sure why that would be needed and if it might not make me loose
    > results.
    >
    > It seems to be the workers cannot finish pusing buffered results into
    > the output queue when calling 'results.join_thread' while terminating,
    > but why is that? I tried calling 'results.close()' before joining the
    > workers in the main process, but it does not make a difference.
    >
    > Is there something I am understanding wrong about the interface? Is
    > there a much better way to do what I am trying to do above?
    >

    It think it's down to the difference between multithreading and
    multiprocessing.

    When multithreading, the threads share the same address space, so items
    can be passed between the threads directly.

    However, when multiprocessing, the processes don't share the same
    address space, so items need to be passed from process to process via a
    pipe. Unfortunately, the pipe has a limited capacity, so if a process
    doesn't read from one end then the pipe will eventually fill up and the
    sender will block. Also, a process won't terminate until it has finished
    writing to the pipe, and it can't be joined until it has terminated.

    You can therefore get into a deadlock where:

    * Process A won't read from the queue until it has joined process B.
    * The join won't succeed until process B has terminated.
    * Process B won't terminate until it has finished writing to the queue.
    * Process B can't finish writing to the queue because it's full.
    * The queue is full because process A isn't reading from it.
    MRAB, Oct 7, 2009
    #2
    1. Advertising

  3. On Oct 7, 12:16 pm, MRAB <> wrote:
    > Felix wrote:
    > > Hello,

    >
    > > I keep running into a deadlock in a fairly simple parallel script
    > > using Multiprocessing.Queue for sending tasks and receiving results.


    > > It seems to be the workers cannot finish pusing buffered results into
    > > the output queue when calling 'results.join_thread' while terminating,
    > > but why is that? I tried calling 'results.close()' before joining the
    > > workers in the main process, but it does not make a difference.

    >
    > > Is there something I am understanding wrong about the interface? Is
    > > there a much better way to do what I am trying to do above?


    > You can therefore get into a deadlock where:
    >
    > * Process A won't read from the queue until it has joined process B.
    > * The join won't succeed until process B has terminated.
    > * Process B won't terminate until it has finished writing to the queue.
    > * Process B can't finish writing to the queue because it's full.
    > * The queue is full because process A isn't reading from it.


    I thought about that, but it seemed unlikely since I am not generating
    too many results (a few thousand small touples of int). Also I tried
    to deal with it by reading as many results form the queue as were
    available, then joining the workers, then reading again. This did not
    work reliably, maybe because the queue would fill up again while I
    start joining the individual workers.

    In any case the core of the problem is the following:

    A bunch of workers push an unknown number of results into a queue. The
    main process needs to collect all those results.

    What is the right way to implement that with multiprocessing? I tried
    joining the workers and then reading everything available, but
    obviously (see above) that does not seem to work.

    A dirty trick that works would be reading all results slowly and
    assuming no more results are comming after the queue is empty, but
    this is obviously unstable:

    while 1:
    try:
    res.append(results.get(True,LONG_TIMEOUT))
    except Empty:
    break

    It could be made somewhat better by joining the workers afterwards and
    reading again, but another deadlock might happen.

    What I am doing now is having the workers push a "DONE" flag on the
    result queue when they end and reading results until all DONE flags
    have arrived:


    def work(tasks, results):
    for task in iter(tasks.get, 'STOP'):
    res = calc(*task)
    if res:
    results.put(res)
    tasks.put((task[0], res[1]))
    tasks.put((res[0],task[1]))
    queue.task_done()
    results.put('DONE')

    And in main:

    res = []
    for i in range(opts.nprocs):
    res += list(iter(results.get,'DONE'))

    for p in procs:
    p.join()

    This seems to work, and as long as workers push data to the results
    queue in the same order as the puts happen in each process (is this
    guaranteed?) it should be stable. But is it the best/easiest way to do
    this?
    Felix Schlesinger, Oct 7, 2009
    #3
  4. On Wed, 7 Oct 2009 10:24:08 -0700 (PDT), Felix Schlesinger
    <> declaimed the following in
    gmane.comp.python.general:


    I've not studied the multiprocessing module, but if join works
    similar to the one from threading...

    >
    > I thought about that, but it seemed unlikely since I am not generating
    > too many results (a few thousand small touples of int). Also I tried
    > to deal with it by reading as many results form the queue as were
    > available, then joining the workers, then reading again. This did not
    > work reliably, maybe because the queue would fill up again while I
    > start joining the individual workers.
    >
    > In any case the core of the problem is the following:
    >
    > A bunch of workers push an unknown number of results into a queue. The
    > main process needs to collect all those results.
    >
    > What is the right way to implement that with multiprocessing? I tried
    > joining the workers and then reading everything available, but
    > obviously (see above) that does not seem to work.
    >

    ... join is a blocking call -- it won't return until /that/ "object"
    terminates. But if you are joining them in some order OTHER than that in
    which they are terminating it may be that the queue is still being
    filled and resulting in the one you wish to join becoming blocked.


    The cleanest solution that I can think of is to have the processes
    return a special token which identifies WHICH process is terminating, so
    you can join just that one, and go back and continue looking for data
    from the others.
    --
    Wulfraed Dennis Lee Bieber KD6MOG
    HTTP://wlfraed.home.netcom.com/
    Dennis Lee Bieber, Oct 8, 2009
    #4
  5. On Oct 8, 3:21 am, Dennis Lee Bieber <> wrote:
    > On Wed, 7 Oct 2009 10:24:08 -0700 (PDT), Felix Schlesinger
    > > A bunch of workers push an unknown number of results into a queue. The
    > > main process needs to collect all those results.

    >
    > > What is the right way to implement that with multiprocessing? I tried
    > > joining the workers and then reading everything available, but
    > > obviously (see above) that does not seem to work.

    >
    >         The cleanest solution that I can think of is to have the processes
    > return a special token which identifies WHICH process is terminating, so
    > you can join just that one, and go back and continue looking for data
    > from the others.


    I implemented the lazy version of this, namely waiting until all
    workers signal that they are done (reading results until I encounter
    the right number of 'done' tokens'. And only after that joining all
    workers.). I think this is stable, but I am not an expert on the
    issue.
    Putting 'done' is always the last call to queue.put a worker makes.
    Does that guarantee that it will not block after 'done' is read by the
    main process?

    Felix
    Felix Schlesinger, Oct 8, 2009
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    681
    Russell Warren
    Jun 27, 2006
  2. redbaron
    Replies:
    3
    Views:
    497
    Paul Rubin
    Oct 21, 2008
  3. Brian Quinlan

    multiprocessing deadlock

    Brian Quinlan, Oct 23, 2009, in forum: Python
    Replies:
    5
    Views:
    543
    larudwer
    Oct 24, 2009
  4. Sebastien Binet
    Replies:
    4
    Views:
    876
    Sebastien Binet
    Dec 16, 2009
  5. Kris
    Replies:
    0
    Views:
    479
Loading...

Share This Page