Multiprocessing.Queue - I want to end.

Discussion in 'Python' started by Luis Zarrabeitia, Apr 30, 2009.

  1. Hi. I'm building a script that closely follows a producer-consumer model. In
    this case, the producer is disk-bound and the consumer is cpu-bound, so I'm
    using the multiprocessing module (python2.5 with the multiprocessing backport
    from google.code) to speed up the processing (two consumers, one per core,
    and one producer). The consumers are two multiprocessing.Process instances,
    the producer is the main script, and the data is sent using a
    multiprocessing.Queue instance (with bounded capacity).

    The problem: when there is no more data to process, how can I signal the
    consumers to consume until the queue is empty and then stop consuming? I need
    them to do some clean-up work after they finish (and then I need the main
    script to summarize the results)

    Currently, the script looks like this:

    ===
    from multiprocessing import Queue, Process

    def consumer(filename, queue):
    outfile = open(filename,'w')
    for data in iter(queue.get, None):
    process_data(data, outfile) # stores the result in the outfile
    outfile.close()
    cleanup_consumer(filename)

    if __name__ == "__main__":
    queue = Queue(100)
    p1 = Process(target=consumer, args=("file1.txt", queue))
    p2 = Process(target=consumer, args=("file1.txt", queue))
    p1.start(); p2.start()
    for item in read_from_disk(): # this is the disk-bound operation
    queue.put(item)
    queue.put(None); queue.put(None)
    p1.join() # Wait until both consumers finish their work
    p2.join()
    # Tried to put this one before... but then the 'get' raises
    # an exception, even if there are still items to consume.
    queue.close()
    summarize() # very fast, no need to parallelize this.
    ===

    As you can see, I'm sending one 'None' per consumer, and hoping that no
    consumer will read more than one None. While this particular implementation
    ensures that, it is very fragile. Is there any way to signal the consumers?
    (or better yet, the queue itself, as it is shared by all consumers?)
    Should "close" work for this? (raise the exception when the queue is
    exhausted, not when it is closed by the producer).

    --
    Luis Zarrabeitia (aka Kyrie)
    Fac. de Matemática y Computación, UH.
    http://profesores.matcom.uh.cu/~kyrie
     
    Luis Zarrabeitia, Apr 30, 2009
    #1
    1. Advertising

  2. Luis Zarrabeitia

    Aaron Brady Guest

    On Apr 30, 3:49 pm, Luis Zarrabeitia <> wrote:
    > Hi. I'm building a script that closely follows a producer-consumer model. In
    > this case, the producer is disk-bound and the consumer is cpu-bound, so I'm
    > using the multiprocessing module (python2.5 with the multiprocessing backport
    > from google.code) to speed up the processing (two consumers, one per core,
    > and one producer). The consumers are two multiprocessing.Process instances,
    > the producer is the main script, and the data is sent using a
    > multiprocessing.Queue instance (with bounded capacity).
    >
    > The problem: when there is no more data to process, how can I signal the
    > consumers to consume until the queue is empty and then stop consuming? I need
    > them to do some clean-up work after they finish (and then I need the main
    > script to summarize the results)

    snip
    >     for data in iter(queue.get, None):
    >         process_data(data, outfile) # stores the result in the outfile

    snip
    >     queue.put(None); queue.put(None)

    snip
    > As you can see, I'm sending one 'None' per consumer, and hoping that no
    > consumer will read more than one None. While this particular implementation
    > ensures that, it is very fragile. Is there any way to signal the consumers?
    > (or better yet, the queue itself, as it is shared by all consumers?)
    > Should "close" work for this? (raise the exception when the queue is
    > exhausted, not when it is closed by the producer).


    You may have to write the consumer loop by hand, rather than using
    'for'. In the same-process case, you can do this.

    producer:
    sentinel= object( )

    consumer:
    while True:
    item= queue.get( )
    if item is sentinel:
    break
    etc.

    Then, each consumer is guaranteed to consume no more than one
    sentinel, and thus producing one sentinel per consumer will halt them
    all.

    However, with multiple processes, the comparison to 'sentinel' will
    fail, since each subprocess gets a copy, not the original, of the
    sentinel. A sample program which sent the same object multiple times
    produced this output:

    <object object at 0x00B8A388>
    <object object at 0x00B8A3A0>

    Theoretically, you could send a shared object, which would satisfy the
    identity test in the subprocess. That failed with this exception:

    File "c:\programs\python30\lib\multiprocessing\queues.py", line 51,
    in __getstate__
    assert_spawning(self)
    ....
    RuntimeError: Queue objects should only be shared between processes th
    rough inheritance

    As a result, your options are more complicated. I think the best
    option is to send a tuple with the data. Instead of sending 'item',
    send '( True, item )'. Then when the producer is finished, send
    '( False, <any> )'. The consumer will break when it encounters a
    'False' first value.

    An alternative is to spawn a watchman thread in each subprocess, which
    merely blocks for a shared Event object, then sets a per-process
    variable, then adds a dummy object to the queue. The dummy is
    guaranteed to be added after the last of the data. Each process is
    guaranteed to consume no more than one dummy, so they will all wake
    up.

    If you don't like those, you could just use a time-out, which checks
    the contents of a shared variable, like a one-element array, then
    checks the queue to be empty. If the shared variable is True, and the
    queue is empty, there is no more data.

    I'm curious how these work and what you decide.
     
    Aaron Brady, May 1, 2009
    #2
    1. Advertising

  3. Quoting Dennis Lee Bieber <>:

    > I'm not familiar with the multiprocessing module and its queues but,
    > presuming it behaves similar to the threading module AND that you have
    > design control over the consumers (as you did in the sample) make a
    > minor change.
    >
    > queue.put(None) ONCE in the producer
    >
    > Then, in the consumer, after it sees the None and begins shutdown
    > processing, have the consumer ALSO do
    >
    > queue.put(None)
    >


    Thank you. I went with this idea, only that instead of modifying the consumer, I
    modified the queue itself... Well, Cameron Simpson did :D. It's working nicely now.

    --
    Luis Zarrabeitia
    Facultad de Matemática y Computación, UH
    http://profesores.matcom.uh.cu/~kyrie

    --
    Participe en Universidad 2010, del 8 al 12 de febrero de 2010
    La Habana, Cuba
    http://www.universidad2010.cu
     
    Luis Alberto Zarrabeitia Gomez, May 4, 2009
    #3
  4. Luis Zarrabeitia

    Guest

    > You may have to write the consumer loop by hand, rather than using
    > 'for'.  In the same-process case, you can do this.
    >
    > producer:
    > sentinel= object( )
    >
    > consumer:
    > while True:
    >   item= queue.get( )
    >   if item is sentinel:
    >     break
    >   etc.
    >
    > Then, each consumer is guaranteed to consume no more than one
    > sentinel, and thus producing one sentinel per consumer will halt them
    > all.
    >
    > However, with multiple processes, the comparison to 'sentinel' will
    > fail, since each subprocess gets a copy, not the original, of the
    > sentinel.


    Rather than use object() you can create a type whose instances are
    equal.

    class Stopper(object):
    def __eq__(self, other):
    return type(other) == type(self)

    producer's stop():
    queue.put(Stopper())

    consumers main loop:
    for item in iter(queue.get, Stopper()):
    ...
     
    , May 4, 2009
    #4
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    689
    Russell Warren
    Jun 27, 2006
  2. redbaron
    Replies:
    1
    Views:
    350
    redbaron
    Oct 15, 2008
  3. redbaron
    Replies:
    3
    Views:
    505
    Paul Rubin
    Oct 21, 2008
  4. Hendrik van Rooyen

    Re: Multiprocessing.Queue - I want to end.

    Hendrik van Rooyen, May 1, 2009, in forum: Python
    Replies:
    4
    Views:
    313
    Dave Angel
    May 2, 2009
  5. Kris
    Replies:
    0
    Views:
    491
Loading...

Share This Page