multiprocessing: pool with blocking queue

M

masher

Hi,

I am trying to implement a multiprocessing pool that assigns tasks
from a blocking queue. My situation is a pretty classic producer/
consumer conundrum, where the producer can produce much faster than
the consumers can consume. The wrinkle in the story is that the
producer produces objects that consume large amounts of memory, so I
would like the queue to block when it reaches, say, twice the number
of tasks as there are processes in the pool, so that I don't devour my
RAM. I have been able to implement this one way, but I am somewhat
displeased with the result. Code speaks louder than words, so here is
a brief example of the technique I am using, followed by an
explanation of why I am unhappy with it:

===
#!/usr/bin/env python3

import time
import random
from multiprocessing import Pool, Queue, M

def procfunc(queue):
time.sleep(random.random() * 2)
return queue.get()*2

def printrange(n):
for i in range(n):
print("generated " + str(i))
yield i

if __name__ == "__main__":
sm = Manager()
pool = Pool()
queuelen = len(pool._pool) * 2
queue = sm.Queue(queuelen)
for i in printrange(100):
queue.put(i)
pool.apply_async(procfunc, (queue,), callback=print)
pool.close()
pool.join()
===

The reason I am unhappy with this trick is that if you examine the
source code of pool.py, you will note that the class Pool already uses
an internal queue, "_taskqueue" from which the tasks are assigned to
processes in the pool.

Particularly:
def __init__(self, processes=None, initializer=None, initargs=()):
self._setup_queues()
self._taskqueue = queue.Queue()
....snip

It seems to me that if I could only subclass and do
queuelen = len(pool._pool) * 2
self._taskqueue = queue.Queue(queuelen)

later in the constructor, once the pool length has been established, I
would have a much more elegant, transparent solution to the problem.

Unfortunately, the design of the Pool class is such that actually
implementing this solution would be very hackish and inelegant. If
only, say, _setup_queues() were called after the _taskqueue
assignment, then I could override it.

My questions, then, is: Is there a more elegant/pythonic way of doing
what I am trying to do with the current Pool class?

If the verdict is no, I'll be happy to file a bug report.
 
J

J Kenneth King

masher said:
My questions, then, is: Is there a more elegant/pythonic way of doing
what I am trying to do with the current Pool class?

Forgive me, I may not fully understand what you are trying to do here
(I've never really used multiprocessing all that much)...

But couldn't you just assign your own Queue object to the Pool instance?
 
M

masher

Forgive me, I may not fully understand what you are trying to do here
(I've never really used multiprocessing all that much)...

But couldn't you just assign your own Queue object to the Pool instance?

That's basically my question. It does not appear as though there is
any straightforward way of doing this because of the design of Pool's
__init__ method, which passes _taskqueue to several functions. Hence,
even if I were to reassign _taskqueue after __init__, that wouldn't
change anything.
 
J

J Kenneth King

masher said:
That's basically my question. It does not appear as though there is
any straightforward way of doing this because of the design of Pool's
__init__ method, which passes _taskqueue to several functions. Hence,
even if I were to reassign _taskqueue after __init__, that wouldn't
change anything.

I think I understand.

There are ways to modify the class before instantiating it, but even the
most clever or elegant solution will still smell funny. I suppose this
might be worth submitting as a feature suggestion to the multiprocessing
project.

Best of luck.
 
R

ryles

My questions, then, is: Is there a more elegant/pythonic way of doing
what I am trying to do with the current Pool class?

Another thing you might try is to subclass Pool and add an apply_async
() wrapper which would wait for _taskqueue.qsize() to reach the
desired size. You would probably do this wait in a loop with a small
sleep. This approach would avoid needing a second Queue, but you would
also add some delay to your producer due to the sleep (something
you're not currently worried about). The minimum sleep may be
something like 1 ms (it's really system dependent), but the time it
takes for a thread blocked on a mutex to wake up is often more on the
order of microseconds, which you have with your blocking queue.

I doubt this offers you much satisfaction, though.
If the verdict is no, I'll be happy to file a bug report.

Yeah, I think it's a worth a try.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,756
Messages
2,569,540
Members
45,024
Latest member
ARDU_PROgrammER

Latest Threads

Top