multiprocessing: pool with blocking queue

Discussion in 'Python' started by masher, Jul 2, 2009.

  1. masher

    masher Guest

    Hi,

    I am trying to implement a multiprocessing pool that assigns tasks
    from a blocking queue. My situation is a pretty classic producer/
    consumer conundrum, where the producer can produce much faster than
    the consumers can consume. The wrinkle in the story is that the
    producer produces objects that consume large amounts of memory, so I
    would like the queue to block when it reaches, say, twice the number
    of tasks as there are processes in the pool, so that I don't devour my
    RAM. I have been able to implement this one way, but I am somewhat
    displeased with the result. Code speaks louder than words, so here is
    a brief example of the technique I am using, followed by an
    explanation of why I am unhappy with it:

    ===
    #!/usr/bin/env python3

    import time
    import random
    from multiprocessing import Pool, Queue, M

    def procfunc(queue):
    time.sleep(random.random() * 2)
    return queue.get()*2

    def printrange(n):
    for i in range(n):
    print("generated " + str(i))
    yield i

    if __name__ == "__main__":
    sm = Manager()
    pool = Pool()
    queuelen = len(pool._pool) * 2
    queue = sm.Queue(queuelen)
    for i in printrange(100):
    queue.put(i)
    pool.apply_async(procfunc, (queue,), callback=print)
    pool.close()
    pool.join()
    ===

    The reason I am unhappy with this trick is that if you examine the
    source code of pool.py, you will note that the class Pool already uses
    an internal queue, "_taskqueue" from which the tasks are assigned to
    processes in the pool.

    Particularly:
    def __init__(self, processes=None, initializer=None, initargs=()):
    self._setup_queues()
    self._taskqueue = queue.Queue()
    ....snip

    It seems to me that if I could only subclass and do
    queuelen = len(pool._pool) * 2
    self._taskqueue = queue.Queue(queuelen)

    later in the constructor, once the pool length has been established, I
    would have a much more elegant, transparent solution to the problem.

    Unfortunately, the design of the Pool class is such that actually
    implementing this solution would be very hackish and inelegant. If
    only, say, _setup_queues() were called after the _taskqueue
    assignment, then I could override it.

    My questions, then, is: Is there a more elegant/pythonic way of doing
    what I am trying to do with the current Pool class?

    If the verdict is no, I'll be happy to file a bug report.
    masher, Jul 2, 2009
    #1
    1. Advertising

  2. masher <> writes:

    > My questions, then, is: Is there a more elegant/pythonic way of doing
    > what I am trying to do with the current Pool class?


    Forgive me, I may not fully understand what you are trying to do here
    (I've never really used multiprocessing all that much)...

    But couldn't you just assign your own Queue object to the Pool instance?
    J Kenneth King, Jul 2, 2009
    #2
    1. Advertising

  3. masher

    masher Guest

    On Jul 2, 12:06 pm, J Kenneth King <> wrote:
    > masher <> writes:
    > > My questions, then, is: Is there a more elegant/pythonic way of doing
    > > what I am trying to do with the current Pool class?

    >
    > Forgive me, I may not fully understand what you are trying to do here
    > (I've never really used multiprocessing all that much)...
    >
    > But couldn't you just assign your own Queue object to the Pool instance?


    That's basically my question. It does not appear as though there is
    any straightforward way of doing this because of the design of Pool's
    __init__ method, which passes _taskqueue to several functions. Hence,
    even if I were to reassign _taskqueue after __init__, that wouldn't
    change anything.
    masher, Jul 2, 2009
    #3
  4. masher <> writes:

    > On Jul 2, 12:06 pm, J Kenneth King <> wrote:
    >> masher <> writes:
    >> > My questions, then, is: Is there a more elegant/pythonic way of doing
    >> > what I am trying to do with the current Pool class?

    >>
    >> Forgive me, I may not fully understand what you are trying to do here
    >> (I've never really used multiprocessing all that much)...
    >>
    >> But couldn't you just assign your own Queue object to the Pool instance?

    >
    > That's basically my question. It does not appear as though there is
    > any straightforward way of doing this because of the design of Pool's
    > __init__ method, which passes _taskqueue to several functions. Hence,
    > even if I were to reassign _taskqueue after __init__, that wouldn't
    > change anything.


    I think I understand.

    There are ways to modify the class before instantiating it, but even the
    most clever or elegant solution will still smell funny. I suppose this
    might be worth submitting as a feature suggestion to the multiprocessing
    project.

    Best of luck.
    J Kenneth King, Jul 2, 2009
    #4
  5. masher

    ryles Guest

    On Jul 2, 11:09 am, masher <> wrote:
    > My questions, then, is: Is there a more elegant/pythonic way of doing
    > what I am trying to do with the current Pool class?


    Another thing you might try is to subclass Pool and add an apply_async
    () wrapper which would wait for _taskqueue.qsize() to reach the
    desired size. You would probably do this wait in a loop with a small
    sleep. This approach would avoid needing a second Queue, but you would
    also add some delay to your producer due to the sleep (something
    you're not currently worried about). The minimum sleep may be
    something like 1 ms (it's really system dependent), but the time it
    takes for a thread blocked on a mutex to wake up is often more on the
    order of microseconds, which you have with your blocking queue.

    I doubt this offers you much satisfaction, though.

    > If the verdict is no, I'll be happy to file a bug report.


    Yeah, I think it's a worth a try.
    ryles, Jul 3, 2009
    #5
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    681
    Russell Warren
    Jun 27, 2006
  2. redbaron
    Replies:
    3
    Views:
    497
    Paul Rubin
    Oct 21, 2008
  3. Allen Fowler
    Replies:
    1
    Views:
    1,205
  4. John Ladasky
    Replies:
    7
    Views:
    887
    John Ladasky
    Sep 17, 2011
  5. Kris
    Replies:
    0
    Views:
    479
Loading...

Share This Page