Multiprocessing problem with producer/consumer

Discussion in 'Python' started by Wu Zhe, May 27, 2009.

  1. Wu Zhe

    Wu Zhe Guest

    I am writing a server program with one producer and multiple consumers,
    what confuses me is only the first task producer put into the queue gets
    consumed, after which tasks enqueued no longer get consumed, they remain
    in the queue forever.

    from multiprocessing import Process, Pool, Queue, cpu_count
    from http import httpserv

    def work(queue):
    while True:
    task = queue.get()
    if task is None:
    break
    time.sleep(5)
    print "task done:", task
    queue.put(None)

    class Manager:
    def __init__(self):
    self.queue = Queue()
    self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
    self.workers = [Process(target=work, args=(self.queue,))
    for i in xrange(self.NUMBER_OF_PROCESSES)]
    for w in self.workers
    w.start()

    httpserv(self.queue)

    def reload(self):
    print "RELOAD"

    def stop(self):
    self.queue.put(None)
    for i in range(self.NUMBER_OF_PROCESS):
    self.workers.join()
    queue.close()

    Manager().start()

    The producer is a HTTP server which put a task in the queue once receive
    a request from the user. It seems that consumer processes are still
    blocked when there are new tasks in the queue, which is weird.

    P.S. Another two questions not relating to the above, I am not sure if
    it's better to put HTTP server in its own process other than the main
    process, if yes how can I make the main process keep running before all
    children processes end. Second question, what's the best way to stop the
    HTTP server gracefully?
     
    Wu Zhe, May 27, 2009
    #1
    1. Advertising

  2. Wu Zhe

    MRAB Guest

    Wu Zhe wrote:
    > I am writing a server program with one producer and multiple consumers,
    > what confuses me is only the first task producer put into the queue gets
    > consumed, after which tasks enqueued no longer get consumed, they remain
    > in the queue forever.
    >
    > from multiprocessing import Process, Pool, Queue, cpu_count
    > from http import httpserv
    >
    > def work(queue):
    > while True:
    > task = queue.get()
    > if task is None:
    > break
    > time.sleep(5)


    The 'time' module hasn't been imported, so the worker raises an
    exception when it gets to this line and then terminates.

    > print "task done:", task
    > queue.put(None)
    >
    > class Manager:
    > def __init__(self):
    > self.queue = Queue()
    > self.NUMBER_OF_PROCESSES = cpu_count()
    >
    > def start(self):
    > self.workers = [Process(target=work, args=(self.queue,))
    > for i in xrange(self.NUMBER_OF_PROCESSES)]
    > for w in self.workers


    Missing ":" on the end of the line.

    > w.start()
    >
    > httpserv(self.queue)
    >
    > def reload(self):
    > print "RELOAD"
    >
    > def stop(self):
    > self.queue.put(None)
    > for i in range(self.NUMBER_OF_PROCESS):


    Should be "self.NUMBER_OF_PROCESSES".

    > self.workers.join()
    > queue.close()
    >
    > Manager().start()
    >
    > The producer is a HTTP server which put a task in the queue once receive
    > a request from the user. It seems that consumer processes are still
    > blocked when there are new tasks in the queue, which is weird.
    >
    > P.S. Another two questions not relating to the above, I am not sure if
    > it's better to put HTTP server in its own process other than the main
    > process, if yes how can I make the main process keep running before all
    > children processes end. Second question, what's the best way to stop the
    > HTTP server gracefully?
     
    MRAB, May 27, 2009
    #2
    1. Advertising

  3. >>>>> Wu Zhe <> (WZ) wrote:

    >WZ> I am writing a server program with one producer and multiple consumers,
    >WZ> what confuses me is only the first task producer put into the queue gets
    >WZ> consumed, after which tasks enqueued no longer get consumed, they remain
    >WZ> in the queue forever.


    >WZ> from multiprocessing import Process, Pool, Queue, cpu_count
    >WZ> from http import httpserv


    >WZ> def work(queue):
    >WZ> while True:
    >WZ> task = queue.get()
    >WZ> if task is None:
    >WZ> break
    >WZ> time.sleep(5)
    >WZ> print "task done:", task
    >WZ> queue.put(None)


    >WZ> class Manager:
    >WZ> def __init__(self):
    >WZ> self.queue = Queue()
    >WZ> self.NUMBER_OF_PROCESSES = cpu_count()


    >WZ> def start(self):
    >WZ> self.workers = [Process(target=work, args=(self.queue,))
    >WZ> for i in xrange(self.NUMBER_OF_PROCESSES)]
    >WZ> for w in self.workers
    >WZ> w.start()


    >WZ> httpserv(self.queue)


    >WZ> def reload(self):
    >WZ> print "RELOAD"


    >WZ> def stop(self):
    >WZ> self.queue.put(None)
    >WZ> for i in range(self.NUMBER_OF_PROCESS):
    >WZ> self.workers.join()
    >WZ> queue.close()


    >WZ> Manager().start()


    >WZ> The producer is a HTTP server which put a task in the queue once receive
    >WZ> a request from the user. It seems that consumer processes are still
    >WZ> blocked when there are new tasks in the queue, which is weird.


    How do you know there are still tasks in the queue?

    When I replace your httpserv(self.queue) with:

    for i in range(100):
    self.queue.put(i)
    it just works. So it seems probable to me that the problem is in
    httpserv. Maybe it stalls or maybe it puts a None in the queue? You
    could debug by logging the puts in the queue.

    >WZ> P.S. Another two questions not relating to the above, I am not sure if
    >WZ> it's better to put HTTP server in its own process other than the main
    >WZ> process, if yes how can I make the main process keep running before all
    >WZ> children processes end. Second question, what's the best way to stop the
    >WZ> HTTP server gracefully?


    In don't think it is useful to put the HTTP server in its own process as
    the Manager process has hardly anything to do. But if you do you can
    make it wait by doing the join of the worker processes at the end,
    instead of inside the stop().

    Stopping the HTTP server: is it multithreaded? You can have a boolean
    that indicates it should accept no new requests. Without more info about
    the server it is hard to give a more detailed answer.
    --
    Piet van Oostrum <>
    URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
    Private email:
     
    Piet van Oostrum, May 27, 2009
    #3
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Mark McKay
    Replies:
    0
    Views:
    465
    Mark McKay
    Dec 9, 2003
  2. Buck Turgidson

    Simple Producer/Consumer Thread Question

    Buck Turgidson, Feb 17, 2004, in forum: Java
    Replies:
    5
    Views:
    555
    Tony Dahlman
    Feb 21, 2004
  3. Jeff
    Replies:
    4
    Views:
    690
    xarax
    Oct 22, 2004
  4. George Sakkis

    Producer-consumer threading problem

    George Sakkis, Jun 11, 2008, in forum: Python
    Replies:
    5
    Views:
    474
    George Sakkis
    Jun 12, 2008
  5. Michael Malone

    Producer-Consumer problem

    Michael Malone, Mar 16, 2009, in forum: Ruby
    Replies:
    0
    Views:
    228
    Michael Malone
    Mar 16, 2009
Loading...

Share This Page