multiprocessing.Pool, its queue, and pre-emption

J

John Ladasky

Suppose that I have a multi-core computer with N CPU's, and I create a
multiprocessing.Pool in Python 2.6 with N-1 Processes.

(My rationale for choosing N-1 Processes was discussed here:
http://groups.google.com/group/comp.lang.python/browse_frm/thread/65ba3ccd4be8228c)

Then I use Pool.map_async() to break up a long-running task T1 into M
chunks, where M > 2N.

Suppose that I have a second, parallelizable, long-running task T2
that I want to address in REAL TIME when the need arises. Using Pool,
is there a way for me to insert the chunks of T2 at the HEAD of the
task queue, instead of at its TAIL?

I've been snooping around inside Pool, and I would guess that what I
want to do is to manipulate Pool._inqueue, which is a
multiprocessing.queues.SimpleQueue object. I haven't found any
documentation for SimpleQueue. It appears to have only the most
rudimentary of public methods -- put, get, and empty.

Reading further, I can see that multiprocessing.Queue (not the same as
SimpleQueue) has put_nowait(), which looks like what I need. Assuming
that I had access to put_nowait(), then I would need an option in
map_async() which would invoke put_nowait() rather than just plain
put().

Does anyone know of an efficient way to accomplish this? I'm thinking
that a subclass of Pool which replaces the SimpleQueue with a full-
fledged Queue, and which overrides map_async() allowing for a no-wait
task, would be very useful... but maybe I'm re-inventing the wheel.
Thanks for your advice!
 
C

Chris Angelico

Suppose that I have a second, parallelizable, long-running task T2
that I want to address in REAL TIME when the need arises.  Using Pool,
is there a way for me to insert the chunks of T2 at the HEAD of the
task queue, instead of at its TAIL?

That's a self-contradiction there. Even if you insert a task into the
head of the queue, it won't be addressed in real time; the only way to
do that would be to have a dedicated process, always ready to handle
the real-time events, or else some kind of interrupt system.

But if you just want them handled when there's a worker available,
what you're after is a priority queue system. I don't know if one
exists in Python already or not, but if not, it'd be a worthwhile
addition.

ChrisA
 
J

John Ladasky

That's a self-contradiction there. Even if you insert a task into the
head of the queue, it won't be addressed in real time; the only way to
do that would be to have a dedicated process, always ready to handle
the real-time events, or else some kind of interrupt system.

But if you just want them handled when there's a worker available,
what you're after is a priority queue system. I don't know if one
exists in Python already or not, but if not, it'd be a worthwhile
addition.

ChrisA

Hi, Chris,

Sorry if I'm not quite familiar with the proper terminology for
queues. Let me try to define what I need as precisely as I can.

1) I aim to keep all five child Processes as busy as possible at all
times, crunching numbers.

2) I break my data for the low-priority task, T1, into large enough
chunks to benefit from multiprocessing, but small enough so that any
given Process should become available fairly frequently -- say, every
50 milliseconds. Starting 50 milliseconds late would be close enough
to "real time" for my purposes.

3) But let's say that T1 still has twenty chunks in the Process queue
when T2 comes along. I could be waiting 300 milliseconds for the
queue to empty. If I could just ensure that T2 gets the next
available Processes, I would be happy. Since T2 is also
parallelizable and I am using five child Processes, I would divide T2
into exactly five chunks. These chunks should get inserted at
positions 0-4 in the queue rather than at 20-24. T2 would briefly
commandeer all five Processes and then clear out.

If that's not putting T2 at the head of the queue, I guess I don't
know a better way to describe it.
 
C

Chris Angelico

Starting 50 milliseconds late would be close enough
to "real time" for my purposes.
...

If that's not putting T2 at the head of the queue, I guess I don't
know a better way to describe it.

Yep, your terms are correct, with that caveat ("immediate" or "urgent"
without being "real-time"). So you're definitely looking for a
priority queue.

It may be possible to patch a different queue object straight into
Pool, but I've never done anything like that. It's probably best to
dig in the code and add an option somewhere to pass in a queue object
- that'd be a very useful feature imho.

ChrisA
 
C

Chris Angelico

Ah. Now, see?  Having the right vocabulary helps.  Searching for
"priority queue" brings up this discussion from back in January:

http://groups.google.com/group/comp.lang.python/browse_frm/thread/b69aeced28634898

Now, this discussion refers to a PriorityPool class which doesn't
appear to be a standard part of Python (2.6, anyway).  But I'm not the
first one to ask the question, and there's code out there.

Awesome! I wasn't aware of that (although considering the volume of
traffic on this list, I doubt there's any human who knows everything
that's been discussed). Previously-solved problems save everyone a lot
of trouble.

ChrisA
 
J

John Ladasky

I've been snooping around inside Pool, and I would guess that what I
want to do is to manipulate Pool._inqueue, which is a
multiprocessing.queues.SimpleQueue object.  I haven't found any
documentation for SimpleQueue.  It appears to have only the most
rudimentary of public methods -- put, get, and empty.

Reading more deeply, I think that my first guess was incorrect.
There's also Pool._taskqueue, and it's a full-fledged Queue. It
appears that map.async() puts pending jobs there, not in
Pool._inqueue.

If this is true, then all that should have to be done is to override a
few methods. I'm going to try it.
 
J

John Ladasky

Hey, this pretty easy hack appears to work!

Code:
from multiprocessing.pool import Pool, RUN, MapResult, mapstar

class PriorityPool(Pool):

def map_async_nowait(self, func, iterable, chunksize=None, \
callback=None):
"""
Same as map_async(), except uses put_nowait() and
thus posts tasks to the head of the task queue
rather than its tail.
"""
assert self._state == RUN
if not hasattr(iterable, '__len__'):
iterable = list(iterable)

if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) *
4)
if extra:
chunksize += 1

task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), \
callback)
self._taskqueue.put_nowait((((result._job, i, mapstar, (x,),
{}) \
for i, x in enumerate(task_batches)), None))
return result

def size(self):
"""
This is not an essential function, but I use it in the
demo to ensure that I initially create enough tasks to
occupy every Process.
"""
return len(self._pool)

##================================================================##

if __name__ == "__main__":

from time import sleep

def demo_task(args):
num, time = args
sleep(time)
print num, time

pool = PriorityPool()
size = pool.size()
print "\nConstructed a pool which contains", size, "Processes."
print "Queueing", 2*size, "normal-priority tasks."
normal = enumerate([3.0 + t for t in range(2*size)])
pool.map_async(demo_task, normal, chunksize = 1)
print "Queueing", size, "high-priority tasks."
high = [(2*size + t, 0.2 + 0.1*t) for t in range(size)]
pool.map_async_nowait(demo_task, high, chunksize = 1)
sleep(30) # Give all tasks on the queue time to complete.
print "Complete."

Below is a typical output from my six-core CPU system. The output
differs slightly from run to run -- that's multiprocessing for you,
it's asynchronous.

The tasks are given numbers which correspond to the order that they
are added to the queue. The high-priority tasks are added last and
are thus numbered 12-17 (I place asterisks next to these in the
output, below). Each task prints its number and its time when it
completes. I expect the normal-priority tasks 0-5 to finish before
any high-priority tasks, and they always do. Tasks 6 and 7 are then
interleaved among the high-priority tasks -- not quite what I expect,
but that may have something to do with my rather arbitrary choices of
sleep times. But tasks 8-11 always get pushed to the back, and
complete last.

[output]

Constructed a pool which contains 6 Processes.
Queueing 12 normal-priority tasks.
Queueing 6 high-priority tasks.
0 3.0
1 4.0
2 5.0
3 6.0
4 7.0
5 8.0
6 9.0
12 0.2 *
13 0.3 *
14 0.4 *
15 0.5 *
7 10.0
16 0.6 *
17 0.7 *
8 11.0
9 12.0
10 13.0
11 14.0

[/output]

Please feel free to use this, though I would appreciate an
acknowledgment in your code if you do. :^)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,768
Messages
2,569,574
Members
45,051
Latest member
CarleyMcCr

Latest Threads

Top