Multiprocessing.Queue deadlock

F

Felix

Hello,

I keep running into a deadlock in a fairly simple parallel script
using Multiprocessing.Queue for sending tasks and receiving results.
From the documentation I cannot figure out what is happening and none
of the examples seem to cover quite what I am doing. The main code is

results = mp.Queue()
tasks = mp.JoinableQueue()
tasks.put( (0,0) )
procs = [ mp.Process(target=work, args=(tasks, results)) for i in range
(nprocs)]
for p in procs:
p.daemon = True
p.start()

tasks.join()
for i in range(nprocs): tasks.put('STOP')
for p in procs: p.join()
res=[]
while 1:
try:
res.append(res.get(False))
except Empty: break


The function 'work' both consumes tasks adding the results to the
output queue and adds new tasks to the input queue based on its
result.

def work(tasks, results):
for task in iter(tasks.get, 'STOP'):
res = calc(*task)
if res:
results.put(res)
tasks.put((task[0], res[1]))
tasks.put((res[0],task[1]))
queue.task_done()

This program will hang while the main process joins the workers (after
all results are computed, i.e. after tasks.join() ). The workers have
finished function 'work', but have not terminated yet.

Calling results.cancel_join_thread as a last line in 'work' prevents
the deadlocks, as does terminating the workers directly. However I am
not sure why that would be needed and if it might not make me loose
results.

It seems to be the workers cannot finish pusing buffered results into
the output queue when calling 'results.join_thread' while terminating,
but why is that? I tried calling 'results.close()' before joining the
workers in the main process, but it does not make a difference.

Is there something I am understanding wrong about the interface? Is
there a much better way to do what I am trying to do above?

Thanks
Felix
 
M

MRAB

Felix said:
Hello,

I keep running into a deadlock in a fairly simple parallel script
using Multiprocessing.Queue for sending tasks and receiving results.
From the documentation I cannot figure out what is happening and none
of the examples seem to cover quite what I am doing. The main code is

results = mp.Queue()
tasks = mp.JoinableQueue()
tasks.put( (0,0) )
procs = [ mp.Process(target=work, args=(tasks, results)) for i in range
(nprocs)]
for p in procs:
p.daemon = True
p.start()

tasks.join()
for i in range(nprocs): tasks.put('STOP')
for p in procs: p.join()
res=[]
while 1:
try:
res.append(res.get(False))
except Empty: break


The function 'work' both consumes tasks adding the results to the
output queue and adds new tasks to the input queue based on its
result.

def work(tasks, results):
for task in iter(tasks.get, 'STOP'):
res = calc(*task)
if res:
results.put(res)
tasks.put((task[0], res[1]))
tasks.put((res[0],task[1]))
queue.task_done()

This program will hang while the main process joins the workers (after
all results are computed, i.e. after tasks.join() ). The workers have
finished function 'work', but have not terminated yet.

Calling results.cancel_join_thread as a last line in 'work' prevents
the deadlocks, as does terminating the workers directly. However I am
not sure why that would be needed and if it might not make me loose
results.

It seems to be the workers cannot finish pusing buffered results into
the output queue when calling 'results.join_thread' while terminating,
but why is that? I tried calling 'results.close()' before joining the
workers in the main process, but it does not make a difference.

Is there something I am understanding wrong about the interface? Is
there a much better way to do what I am trying to do above?
It think it's down to the difference between multithreading and
multiprocessing.

When multithreading, the threads share the same address space, so items
can be passed between the threads directly.

However, when multiprocessing, the processes don't share the same
address space, so items need to be passed from process to process via a
pipe. Unfortunately, the pipe has a limited capacity, so if a process
doesn't read from one end then the pipe will eventually fill up and the
sender will block. Also, a process won't terminate until it has finished
writing to the pipe, and it can't be joined until it has terminated.

You can therefore get into a deadlock where:

* Process A won't read from the queue until it has joined process B.
* The join won't succeed until process B has terminated.
* Process B won't terminate until it has finished writing to the queue.
* Process B can't finish writing to the queue because it's full.
* The queue is full because process A isn't reading from it.
 
F

Felix Schlesinger

You can therefore get into a deadlock where:

* Process A won't read from the queue until it has joined process B.
* The join won't succeed until process B has terminated.
* Process B won't terminate until it has finished writing to the queue.
* Process B can't finish writing to the queue because it's full.
* The queue is full because process A isn't reading from it.

I thought about that, but it seemed unlikely since I am not generating
too many results (a few thousand small touples of int). Also I tried
to deal with it by reading as many results form the queue as were
available, then joining the workers, then reading again. This did not
work reliably, maybe because the queue would fill up again while I
start joining the individual workers.

In any case the core of the problem is the following:

A bunch of workers push an unknown number of results into a queue. The
main process needs to collect all those results.

What is the right way to implement that with multiprocessing? I tried
joining the workers and then reading everything available, but
obviously (see above) that does not seem to work.

A dirty trick that works would be reading all results slowly and
assuming no more results are comming after the queue is empty, but
this is obviously unstable:

while 1:
try:
res.append(results.get(True,LONG_TIMEOUT))
except Empty:
break

It could be made somewhat better by joining the workers afterwards and
reading again, but another deadlock might happen.

What I am doing now is having the workers push a "DONE" flag on the
result queue when they end and reading results until all DONE flags
have arrived:


def work(tasks, results):
for task in iter(tasks.get, 'STOP'):
res = calc(*task)
if res:
results.put(res)
tasks.put((task[0], res[1]))
tasks.put((res[0],task[1]))
queue.task_done()
results.put('DONE')

And in main:

res = []
for i in range(opts.nprocs):
res += list(iter(results.get,'DONE'))

for p in procs:
p.join()

This seems to work, and as long as workers push data to the results
queue in the same order as the puts happen in each process (is this
guaranteed?) it should be stable. But is it the best/easiest way to do
this?
 
D

Dennis Lee Bieber

On Wed, 7 Oct 2009 10:24:08 -0700 (PDT), Felix Schlesinger
<[email protected]> declaimed the following in
gmane.comp.python.general:


I've not studied the multiprocessing module, but if join works
similar to the one from threading...
I thought about that, but it seemed unlikely since I am not generating
too many results (a few thousand small touples of int). Also I tried
to deal with it by reading as many results form the queue as were
available, then joining the workers, then reading again. This did not
work reliably, maybe because the queue would fill up again while I
start joining the individual workers.

In any case the core of the problem is the following:

A bunch of workers push an unknown number of results into a queue. The
main process needs to collect all those results.

What is the right way to implement that with multiprocessing? I tried
joining the workers and then reading everything available, but
obviously (see above) that does not seem to work.
... join is a blocking call -- it won't return until /that/ "object"
terminates. But if you are joining them in some order OTHER than that in
which they are terminating it may be that the queue is still being
filled and resulting in the one you wish to join becoming blocked.


The cleanest solution that I can think of is to have the processes
return a special token which identifies WHICH process is terminating, so
you can join just that one, and go back and continue looking for data
from the others.
 
F

Felix Schlesinger

        The cleanest solution that I can think of is to have the processes
return a special token which identifies WHICH process is terminating, so
you can join just that one, and go back and continue looking for data
from the others.

I implemented the lazy version of this, namely waiting until all
workers signal that they are done (reading results until I encounter
the right number of 'done' tokens'. And only after that joining all
workers.). I think this is stable, but I am not an expert on the
issue.
Putting 'done' is always the last call to queue.put a worker makes.
Does that guarantee that it will not block after 'done' is read by the
main process?

Felix
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,769
Messages
2,569,579
Members
45,053
Latest member
BrodieSola

Latest Threads

Top