W
Wu Zhe
I am writing a server program with one producer and multiple consumers,
what confuses me is only the first task producer put into the queue gets
consumed, after which tasks enqueued no longer get consumed, they remain
in the queue forever.
from multiprocessing import Process, Pool, Queue, cpu_count
from http import httpserv
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers
w.start()
httpserv(self.queue)
def reload(self):
print "RELOAD"
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESS):
self.workers.join()
queue.close()
Manager().start()
The producer is a HTTP server which put a task in the queue once receive
a request from the user. It seems that consumer processes are still
blocked when there are new tasks in the queue, which is weird.
P.S. Another two questions not relating to the above, I am not sure if
it's better to put HTTP server in its own process other than the main
process, if yes how can I make the main process keep running before all
children processes end. Second question, what's the best way to stop the
HTTP server gracefully?
what confuses me is only the first task producer put into the queue gets
consumed, after which tasks enqueued no longer get consumed, they remain
in the queue forever.
from multiprocessing import Process, Pool, Queue, cpu_count
from http import httpserv
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers
w.start()
httpserv(self.queue)
def reload(self):
print "RELOAD"
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESS):
self.workers.join()
queue.close()
Manager().start()
The producer is a HTTP server which put a task in the queue once receive
a request from the user. It seems that consumer processes are still
blocked when there are new tasks in the queue, which is weird.
P.S. Another two questions not relating to the above, I am not sure if
it's better to put HTTP server in its own process other than the main
process, if yes how can I make the main process keep running before all
children processes end. Second question, what's the best way to stop the
HTTP server gracefully?