Multiprocessing problem with producer/consumer

W

Wu Zhe

I am writing a server program with one producer and multiple consumers,
what confuses me is only the first task producer put into the queue gets
consumed, after which tasks enqueued no longer get consumed, they remain
in the queue forever.

from multiprocessing import Process, Pool, Queue, cpu_count
from http import httpserv

def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)

class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()

def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers
w.start()

httpserv(self.queue)

def reload(self):
print "RELOAD"

def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESS):
self.workers.join()
queue.close()

Manager().start()

The producer is a HTTP server which put a task in the queue once receive
a request from the user. It seems that consumer processes are still
blocked when there are new tasks in the queue, which is weird.

P.S. Another two questions not relating to the above, I am not sure if
it's better to put HTTP server in its own process other than the main
process, if yes how can I make the main process keep running before all
children processes end. Second question, what's the best way to stop the
HTTP server gracefully?
 
M

MRAB

Wu said:
I am writing a server program with one producer and multiple consumers,
what confuses me is only the first task producer put into the queue gets
consumed, after which tasks enqueued no longer get consumed, they remain
in the queue forever.

from multiprocessing import Process, Pool, Queue, cpu_count
from http import httpserv

def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)

The 'time' module hasn't been imported, so the worker raises an
exception when it gets to this line and then terminates.
print "task done:", task
queue.put(None)

class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()

def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers

Missing ":" on the end of the line.
w.start()

httpserv(self.queue)

def reload(self):
print "RELOAD"

def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESS):

Should be "self.NUMBER_OF_PROCESSES".
self.workers.join()
queue.close()

Manager().start()

The producer is a HTTP server which put a task in the queue once receive
a request from the user. It seems that consumer processes are still
blocked when there are new tasks in the queue, which is weird.

P.S. Another two questions not relating to the above, I am not sure if
it's better to put HTTP server in its own process other than the main
process, if yes how can I make the main process keep running before all
children processes end. Second question, what's the best way to stop the
HTTP server gracefully?
 
P

Piet van Oostrum

Wu Zhe said:
WZ> I am writing a server program with one producer and multiple consumers,
WZ> what confuses me is only the first task producer put into the queue gets
WZ> consumed, after which tasks enqueued no longer get consumed, they remain
WZ> in the queue forever.
WZ> from multiprocessing import Process, Pool, Queue, cpu_count
WZ> from http import httpserv
WZ> def work(queue):
WZ> while True:
WZ> task = queue.get()
WZ> if task is None:
WZ> break
WZ> time.sleep(5)
WZ> print "task done:", task
WZ> queue.put(None)
WZ> class Manager:
WZ> def __init__(self):
WZ> self.queue = Queue()
WZ> self.NUMBER_OF_PROCESSES = cpu_count()
WZ> def start(self):
WZ> self.workers = [Process(target=work, args=(self.queue,))
WZ> for i in xrange(self.NUMBER_OF_PROCESSES)]
WZ> for w in self.workers
WZ> w.start()
WZ> httpserv(self.queue)
WZ> def reload(self):
WZ> print "RELOAD"
WZ> def stop(self):
WZ> self.queue.put(None)
WZ> for i in range(self.NUMBER_OF_PROCESS):
WZ> self.workers.join()
WZ> queue.close()

WZ> Manager().start()
WZ> The producer is a HTTP server which put a task in the queue once receive
WZ> a request from the user. It seems that consumer processes are still
WZ> blocked when there are new tasks in the queue, which is weird.

How do you know there are still tasks in the queue?

When I replace your httpserv(self.queue) with:

for i in range(100):
self.queue.put(i)
it just works. So it seems probable to me that the problem is in
httpserv. Maybe it stalls or maybe it puts a None in the queue? You
could debug by logging the puts in the queue.
WZ> P.S. Another two questions not relating to the above, I am not sure if
WZ> it's better to put HTTP server in its own process other than the main
WZ> process, if yes how can I make the main process keep running before all
WZ> children processes end. Second question, what's the best way to stop the
WZ> HTTP server gracefully?

In don't think it is useful to put the HTTP server in its own process as
the Manager process has hardly anything to do. But if you do you can
make it wait by doing the join of the worker processes at the end,
instead of inside the stop().

Stopping the HTTP server: is it multithreaded? You can have a boolean
that indicates it should accept no new requests. Without more info about
the server it is hard to give a more detailed answer.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,535
Members
45,007
Latest member
obedient dusk

Latest Threads

Top