multiprocessing.Pool, its queue, and pre-emption

Discussion in 'Python' started by John Ladasky, Sep 15, 2011.

  1. John Ladasky

    John Ladasky Guest

    Suppose that I have a multi-core computer with N CPU's, and I create a
    multiprocessing.Pool in Python 2.6 with N-1 Processes.

    (My rationale for choosing N-1 Processes was discussed here:
    http://groups.google.com/group/comp.lang.python/browse_frm/thread/65ba3ccd4be8228c)

    Then I use Pool.map_async() to break up a long-running task T1 into M
    chunks, where M > 2N.

    Suppose that I have a second, parallelizable, long-running task T2
    that I want to address in REAL TIME when the need arises. Using Pool,
    is there a way for me to insert the chunks of T2 at the HEAD of the
    task queue, instead of at its TAIL?

    I've been snooping around inside Pool, and I would guess that what I
    want to do is to manipulate Pool._inqueue, which is a
    multiprocessing.queues.SimpleQueue object. I haven't found any
    documentation for SimpleQueue. It appears to have only the most
    rudimentary of public methods -- put, get, and empty.

    Reading further, I can see that multiprocessing.Queue (not the same as
    SimpleQueue) has put_nowait(), which looks like what I need. Assuming
    that I had access to put_nowait(), then I would need an option in
    map_async() which would invoke put_nowait() rather than just plain
    put().

    Does anyone know of an efficient way to accomplish this? I'm thinking
    that a subclass of Pool which replaces the SimpleQueue with a full-
    fledged Queue, and which overrides map_async() allowing for a no-wait
    task, would be very useful... but maybe I'm re-inventing the wheel.
    Thanks for your advice!
     
    John Ladasky, Sep 15, 2011
    #1
    1. Advertising

  2. On Fri, Sep 16, 2011 at 6:52 AM, John Ladasky <> wrote:
    > Suppose that I have a second, parallelizable, long-running task T2
    > that I want to address in REAL TIME when the need arises.  Using Pool,
    > is there a way for me to insert the chunks of T2 at the HEAD of the
    > task queue, instead of at its TAIL?
    >


    That's a self-contradiction there. Even if you insert a task into the
    head of the queue, it won't be addressed in real time; the only way to
    do that would be to have a dedicated process, always ready to handle
    the real-time events, or else some kind of interrupt system.

    But if you just want them handled when there's a worker available,
    what you're after is a priority queue system. I don't know if one
    exists in Python already or not, but if not, it'd be a worthwhile
    addition.

    ChrisA
     
    Chris Angelico, Sep 15, 2011
    #2
    1. Advertising

  3. John Ladasky

    John Ladasky Guest

    On Sep 15, 3:14 pm, Chris Angelico <> wrote:
    > On Fri, Sep 16, 2011 at 6:52 AM, John Ladasky <> wrote:
    > > Suppose that I have a second, parallelizable, long-running task T2
    > > that I want to address in REAL TIME when the need arises.  Using Pool,
    > > is there a way for me to insert the chunks of T2 at the HEAD of the
    > > task queue, instead of at its TAIL?

    >
    > That's a self-contradiction there. Even if you insert a task into the
    > head of the queue, it won't be addressed in real time; the only way to
    > do that would be to have a dedicated process, always ready to handle
    > the real-time events, or else some kind of interrupt system.
    >
    > But if you just want them handled when there's a worker available,
    > what you're after is a priority queue system. I don't know if one
    > exists in Python already or not, but if not, it'd be a worthwhile
    > addition.
    >
    > ChrisA


    Hi, Chris,

    Sorry if I'm not quite familiar with the proper terminology for
    queues. Let me try to define what I need as precisely as I can.

    1) I aim to keep all five child Processes as busy as possible at all
    times, crunching numbers.

    2) I break my data for the low-priority task, T1, into large enough
    chunks to benefit from multiprocessing, but small enough so that any
    given Process should become available fairly frequently -- say, every
    50 milliseconds. Starting 50 milliseconds late would be close enough
    to "real time" for my purposes.

    3) But let's say that T1 still has twenty chunks in the Process queue
    when T2 comes along. I could be waiting 300 milliseconds for the
    queue to empty. If I could just ensure that T2 gets the next
    available Processes, I would be happy. Since T2 is also
    parallelizable and I am using five child Processes, I would divide T2
    into exactly five chunks. These chunks should get inserted at
    positions 0-4 in the queue rather than at 20-24. T2 would briefly
    commandeer all five Processes and then clear out.

    If that's not putting T2 at the head of the queue, I guess I don't
    know a better way to describe it.
     
    John Ladasky, Sep 15, 2011
    #3
  4. John Ladasky

    John Ladasky Guest

    Ah. Now, see? Having the right vocabulary helps. Searching for
    "priority queue" brings up this discussion from back in January:

    http://groups.google.com/group/comp.lang.python/browse_frm/thread/b69aeced28634898

    Now, this discussion refers to a PriorityPool class which doesn't
    appear to be a standard part of Python (2.6, anyway). But I'm not the
    first one to ask the question, and there's code out there.
     
    John Ladasky, Sep 15, 2011
    #4
  5. On Fri, Sep 16, 2011 at 8:25 AM, John Ladasky <> wrote:
    > Starting 50 milliseconds late would be close enough
    > to "real time" for my purposes.
    > ...
    >
    > If that's not putting T2 at the head of the queue, I guess I don't
    > know a better way to describe it.


    Yep, your terms are correct, with that caveat ("immediate" or "urgent"
    without being "real-time"). So you're definitely looking for a
    priority queue.

    It may be possible to patch a different queue object straight into
    Pool, but I've never done anything like that. It's probably best to
    dig in the code and add an option somewhere to pass in a queue object
    - that'd be a very useful feature imho.

    ChrisA
     
    Chris Angelico, Sep 15, 2011
    #5
  6. On Fri, Sep 16, 2011 at 8:32 AM, John Ladasky <> wrote:
    > Ah. Now, see?  Having the right vocabulary helps.  Searching for
    > "priority queue" brings up this discussion from back in January:
    >
    > http://groups.google.com/group/comp.lang.python/browse_frm/thread/b69aeced28634898
    >
    > Now, this discussion refers to a PriorityPool class which doesn't
    > appear to be a standard part of Python (2.6, anyway).  But I'm not the
    > first one to ask the question, and there's code out there.


    Awesome! I wasn't aware of that (although considering the volume of
    traffic on this list, I doubt there's any human who knows everything
    that's been discussed). Previously-solved problems save everyone a lot
    of trouble.

    ChrisA
     
    Chris Angelico, Sep 15, 2011
    #6
  7. John Ladasky

    John Ladasky Guest

    On Sep 15, 1:52 pm, John Ladasky <> wrote:
    > I've been snooping around inside Pool, and I would guess that what I
    > want to do is to manipulate Pool._inqueue, which is a
    > multiprocessing.queues.SimpleQueue object.  I haven't found any
    > documentation for SimpleQueue.  It appears to have only the most
    > rudimentary of public methods -- put, get, and empty.


    Reading more deeply, I think that my first guess was incorrect.
    There's also Pool._taskqueue, and it's a full-fledged Queue. It
    appears that map.async() puts pending jobs there, not in
    Pool._inqueue.

    If this is true, then all that should have to be done is to override a
    few methods. I'm going to try it.
     
    John Ladasky, Sep 16, 2011
    #7
  8. John Ladasky

    John Ladasky Guest

    Hey, this pretty easy hack appears to work!

    Code:
    
    from multiprocessing.pool import Pool, RUN, MapResult, mapstar
    
    class PriorityPool(Pool):
    
    def map_async_nowait(self, func, iterable, chunksize=None, \
    callback=None):
    """
    Same as map_async(), except uses put_nowait() and
    thus posts tasks to the head of the task queue
    rather than its tail.
    """
    assert self._state == RUN
    if not hasattr(iterable, '__len__'):
    iterable = list(iterable)
    
    if chunksize is None:
    chunksize, extra = divmod(len(iterable), len(self._pool) *
    4)
    if extra:
    chunksize += 1
    
    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), \
    callback)
    self._taskqueue.put_nowait((((result._job, i, mapstar, (x,),
    {}) \
    for i, x in enumerate(task_batches)), None))
    return result
    
    def size(self):
    """
    This is not an essential function, but I use it in the
    demo to ensure that I initially create enough tasks to
    occupy every Process.
    """
    return len(self._pool)
    
    ##================================================================##
    
    if __name__ == "__main__":
    
    from time import sleep
    
    def demo_task(args):
    num, time = args
    sleep(time)
    print num, time
    
    pool = PriorityPool()
    size = pool.size()
    print "\nConstructed a pool which contains", size, "Processes."
    print "Queueing", 2*size, "normal-priority tasks."
    normal = enumerate([3.0 + t for t in range(2*size)])
    pool.map_async(demo_task, normal, chunksize = 1)
    print "Queueing", size, "high-priority tasks."
    high = [(2*size + t, 0.2 + 0.1*t) for t in range(size)]
    pool.map_async_nowait(demo_task, high, chunksize = 1)
    sleep(30) # Give all tasks on the queue time to complete.
    print "Complete."
    
    
    Below is a typical output from my six-core CPU system. The output
    differs slightly from run to run -- that's multiprocessing for you,
    it's asynchronous.

    The tasks are given numbers which correspond to the order that they
    are added to the queue. The high-priority tasks are added last and
    are thus numbered 12-17 (I place asterisks next to these in the
    output, below). Each task prints its number and its time when it
    completes. I expect the normal-priority tasks 0-5 to finish before
    any high-priority tasks, and they always do. Tasks 6 and 7 are then
    interleaved among the high-priority tasks -- not quite what I expect,
    but that may have something to do with my rather arbitrary choices of
    sleep times. But tasks 8-11 always get pushed to the back, and
    complete last.

    [output]

    Constructed a pool which contains 6 Processes.
    Queueing 12 normal-priority tasks.
    Queueing 6 high-priority tasks.
    0 3.0
    1 4.0
    2 5.0
    3 6.0
    4 7.0
    5 8.0
    6 9.0
    12 0.2 *
    13 0.3 *
    14 0.4 *
    15 0.5 *
    7 10.0
    16 0.6 *
    17 0.7 *
    8 11.0
    9 12.0
    10 13.0
    11 14.0

    [/output]

    Please feel free to use this, though I would appreciate an
    acknowledgment in your code if you do. :^)
     
    John Ladasky, Sep 17, 2011
    #8
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    685
    Russell Warren
    Jun 27, 2006
  2. redbaron
    Replies:
    3
    Views:
    504
    Paul Rubin
    Oct 21, 2008
  3. masher
    Replies:
    4
    Views:
    883
    ryles
    Jul 3, 2009
  4. Allen Fowler
    Replies:
    1
    Views:
    1,222
  5. Kris
    Replies:
    0
    Views:
    487
Loading...

Share This Page