Multiprocessing problem

M

Matt Chaput

Hi,

I'm having a problem with the multiprocessing package.

I'm trying to use a simple pattern where a supervisor object starts a
bunch of worker processes, instantiating them with two queues (a job
queue for tasks to complete and an results queue for the results). The
supervisor puts all the jobs in the "job" queue, then join()s the
workers, and then pulls all the completed results off the "results" queue.

(I don't think I can just use something like Pool.imap_unordered for
this because the workers need to be objects with state.)

Here's a simplified example:

http://pastie.org/850512

The problem is that seemingly randomly, but almost always, the worker
processes will deadlock at some point and stop working before they
complete. This will leave the whole program stalled forever. This seems
more likely the more work each worker does (to the point where adding
the time.sleep(0.01) as seen in the example code above guarantees it).
The problem seems to occur on both Windows and Mac OS X.

I've tried many random variations of the code (e.g. using JoinableQueue,
calling cancel_join_thread() on one or both queues even though I have no
idea what it does, etc.) but keep having the problem.

Am I just using multiprocessing wrong? Is this a bug? Any advice?

Thanks,

Matt
 
L

larudwer

Hello Matt

I think the problem is here:

for n in xrange(100000):
outqueue.put(str(n)) <-- fill the queue with 100000
elements
try:
r = inqueue.get_nowait() <-- queue is still empty because
processes need some time to start
results.append(r)
except Empty:
pass <-- causing 100000 passes

.....

print "-"
for task in tasks:
outqueue.put(None) <-- put even more data in the queue
....
# in the meantime the processes start to run and are trying to put data
# in to the output queue. However this queue might fill up, and lock
# all processes that try to write data in the already filled up queue

print "joining"
for task in tasks:
task.join() <-- can never succeed because processes
are waiting for someone reading the result queue
print "joined"

This example works:

from Queue import Empty, Full
from multiprocessing import Queue, Process
from base64 import b64encode
import time, random

class Worker(Process):
def __init__(self, inqueue, outqueue):
Process.__init__(self)
self.inqueue = inqueue
self.outqueue = outqueue

def run(self):
inqueue = self.inqueue
outqueue = self.outqueue
c = 0
while True:
arg = inqueue.get()
if arg is None: break
c += 1
b = b64encode(arg)
outqueue.put(b)

# Clean-up code goes here
outqueue.put(c)

class Supervisor(object):
def __init__(self):
pass

def go(self):
outqueue = Queue()
inqueue = Queue()
tasks = [Worker(outqueue, inqueue) for _ in xrange(4)]
for task in tasks:
task.start()

results = []
print "*"
for n in xrange(100000):
outqueue.put(str(n))

print "-"
for task in tasks:
outqueue.put(None)

print "emptying queue"
try:
while True:
r = inqueue.get_nowait()
results.append(r)
except Empty:
pass
print "done"
print len(results)

print "joining"
for task in tasks:
task.join()
print "joined"

if __name__ == "__main__":
s = Supervisor()
s.go()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,581
Members
45,056
Latest member
GlycogenSupporthealth

Latest Threads

Top