TaskQueue

R

Raymond Hettinger

I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.

So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.

There's are working implementation at:

http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/475160

Here is an example:

def worker():
while 1:
task = q.get()
do_work(task)
q.task_done()

# setup queue and launch worker threads
q = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()

# load the tasks
for elem in tasklist:
q.put(elem)

# block until they are done
q.join()

# process with other stuff
. . .


There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down, 2) complicating the
producer logic to append one sentinel for each consumer when the data
stream is done, 3) actually knowing when the data stream is done. The
latter is a problem in one of my programs because the consumer
sometimes uses a divide-and-conquer approach, resulting in two new
subtasks being loaded to the queue. IOW, the only way to know when all
the inputs have been handled is to have the queue empty and all
consumers inactive (i.e. processing complete).

def worker():
while 1:
task = q.get()
if task is None: # check for sentinel
return
do_work(task)

# setup queue and launch worker threads
q = Queue()
threads = []
for i in range(num_worker_threads):
t = Thread(target=worker)
threads.append(t)
t.start()

# load the tasks
for elem in tasklist:
q.put(elem)
for i in range(num_worker_threads):
q.put(None) # load sentinels

# block until they are done
for t in threads:
t.join()

# process with other stuff
. . .

Another approach is to set-up a second queue for results. Each
consumer loads a result when it is done with processing an input. The
producer or main thread then becomes responsible for matching all
inputs to the corresponding results. This isn't complicated
in-practice but it isn't pretty either:

def worker():
while 1:
task = tasks_in.get()
do_work(task)
tasks_out.put(None) # enqueue a None result
when done with task

# setup queues and launch worker threads
tasks_in = Queue()
tasks_out = Queue()
for i in range(num_worker_threads):
Thread(target=worker).start()

# load the tasks
n = len(tasklist)
for elem in tasklist:
tasks_in.put(elem)

# block until they are done
for i in range(n):
tasks_out.get()

# process with other stuff
. . .

This approach becomes messier if the task loading occurs at multiple
points inside a more complex producer function. Also, it doesn't work
well with the worker thread divide-and-conquer situation discussed
above.
 
R

Rene Pijlman

Raymond Hettinger:
There are some competing approaches. One is to attach sentinel objects
to the end of the line as a way for consumer threads to know that they
should shut down. Then a regular t.join() can be used to block until
the consumers threads have shut-down. This approach is
straight-forward, but it depends on 1) complicating the consumer logic
to include sentinel detection and thread shut-down,

Writing this observation was way more complicated than writing the code
that's required to implement it :)

if task is None:
break

I use 'None' as the sentinel. This is a larger snippet from my own
threadpool with the context of this code:

def run(self):
while True:
task = self.workQueue.get()
if task is None:
break
try:
task.do()
except:
task.unhandledException()
self.resultQueue.put(task)
2) complicating the producer logic to append one sentinel for each consumer
when the data stream is done

for i in range(self.numberOfThreads):
self.workQueue.put(None)

Again, more characters in your observation than in the code.
3) actually knowing when the data stream is done.

def doingWork(self):
return self.numberOfTasks > 0

Which is possible because of:

def putTask(self,task):
self.workQueue.put(task)
self.numberOfTasks += 1

def getTask(self):
task = self.resultQueue.get()
self.numberOfTasks -= 1
return task
 
C

Carl Banks

Raymond said:
I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.

Hmm. How about this: the Producer can cause an exception to be raised
in any Consumers waiting on the Queue, or vice-versa.

I remember encountering a similar problem when writing a slide viewer
that prefetched and slides. I wanted to have the ability to terminate
the slide show early. But what you have is five slides sitting in the
Queue when the user quits (the UI was in the consumer thread, of
course). Now how do you signal the Producer thread to exit? It seemed
to me that allowing one thread to raise an exception in another was the
most straightforward way to do this. (In my case is was the Consumer
raising it in the Producer.)

I briefly considered a subclass of Queue to implement it, but it was
too difficult a thing that I would have wanted to work on at that point
in time. I implemented some workaround that I don't remember, and
forgot about the issue until your post.

But yeah, something like an InterruptableQueue might be a nice thing to
have.


Carl Banks
 
C

Carl Banks

Carl said:
But yeah, something like an InterruptableQueue might be a nice thing to
have.

Ok, I see now that InterruptableQueue wouldn't help the OP, though it
would have helped me in my situation, so it'd still be a good idea.

Carl Banks
 
C

Carl Banks

Rene said:
for i in range(self.numberOfThreads):
self.workQueue.put(None)

Or, you could just put one sentinel in the Queue, and subclass the
Queue's _get method not to take the sentinel out. It might help keep
bookkeeping down (and it seems Raymond was in a situation where keeping
track of threads wasn't so easy).

BTW, for sentinels, I recommend creating one using:

sentinel = object()

Because, although it's not applicable to your example, sometimes None
is an object you want to pass through. (I think Alex Martelli came up
with that one.)

Carl Banks
 
R

Rene Pijlman

Carl Banks:
Rene Pijlman:

Or, you could just put one sentinel in the Queue, and subclass the
Queue's _get method not to take the sentinel out.

Ah yes, clever trick. But you'd have to worry about thread-safety of your
subclass though.
BTW, for sentinels, I recommend creating one using:

sentinel = object()

Because, although it's not applicable to your example, sometimes None
is an object you want to pass through. (I think Alex Martelli came up
with that one.)

You mean this post, I guess:
http://mail.python.org/pipermail/python-list/2004-October/244750.html

I dunno, but I cannot think of a better way to say "this has no possible
use" than to pass None. Place yourself in the position of someone who
doesn't know exactly what object() is and who encounters it in the code.
I've just spent 5 minutes trying to find some reference information and I
gave up ("object" is a lousy search word).
 
R

Ron Adam

Raymond said:
I would like to get feedback on an idea I had for simplifying the use
of queues with daemon consumer threads

Sometimes, I launch one or more consumer threads that wait for a task
to enter a queue and then work on the task. A recurring problem is that
I sometimes need to know if all of the tasks have been completed so I
can exit or do something with the result.

If each thread only does a single task, I can use t.join() to wait
until the task is done. However, if the thread stays alive and waits
for more Queue entries, then there doesn't seem to be a good way to
tell when all the processing is done.

So, the idea is to create a subclass of Queue that increments a counter
when objects are enqueued, that provides a method for worker threads to
decrement the counter when the work is done, and that offers a blocking
join() method that waits until the counter is zero.


Hi Raymond, your approach seems like it would be something I would use.

I'm wonder if threads could be implemented as a type of generator object
where it runs, and the yield waits for next() to be called, instead of
next() waiting for yield. Then you can do...


thread Worker(args):
while 1:
# do stuff
...
yield result # wait for next() to be called.
if done: break

my_thread = Worker(args)
results = list(my_thread) # get all values as they are produced


It would be easy to put these in a list and iterate it.


thread Worker(args):
# Do something with args
...
yield None

# Start threads
active_threads = [Worker(args1), Worker(args2), Worker(args3)]

for T in active_threads:
for _ in T: pass # Make sure they are finished.


If this can be done, then all the messy parts possibly could be handled
within python C code, and not by python source code. If you know how to
make a generator, then you would know how to do "simple" threads. And
maybe anything needing more than this would be better off being an
external task anyway?

I'm sure there are lots of um... issues. ;-)

Cheers,
Ron
 
C

Carl Banks

Rene said:
Carl Banks:

Ah yes, clever trick. But you'd have to worry about thread-safety of your
subclass though.

Queue worries about this for you. The Queue class only calls _get when
it has the queue's mutex, so you can assume thread safety when
subclassing it.

You mean this post, I guess:
http://mail.python.org/pipermail/python-list/2004-October/244750.html

I dunno, but I cannot think of a better way to say "this has no possible
use" than to pass None.

The problem is, sometimes None has a possible use. In your example I
don't think it's a big deal; you're using None more like a "null
pointer" than a "sentinel" anyways. But it's not a good idea for a
generic implementation.

Say you were writing a FiniteQueue class that implemented the permament
sentinel trick to indicate to all consumer threads that the queue is
done. For your own code, you may know that you'll never need to pass
None through the queue, but you can't assume that for all code.

Place yourself in the position of someone who
doesn't know exactly what object() is and who encounters it in the code.
I've just spent 5 minutes trying to find some reference information and I
gave up ("object" is a lousy search word).

I would hope the word "sentinel" in "sentinel = object()" would give
that person some clue as to what it is. :)


Carl Banks
 
R

Rene Pijlman

Carl Banks:
Rene Pijlman:

Queue worries about this for you. The Queue class only calls _get when
it has the queue's mutex, so you can assume thread safety when
subclassing it.

Ah yes, I overlooked the underscore. But you'd have to worry about
everything get() is doing before it calls _get() though :)
Say you were writing a FiniteQueue class that implemented the permament
sentinel trick to indicate to all consumer threads that the queue is
done. For your own code, you may know that you'll never need to pass
None through the queue, but you can't assume that for all code.

Ah, this is where assert proves its value.

class FiniteQueue(object):
def putTask(self,task):
assert task is not None
self.workQueue.put(task)
self.numberOfTasks += 1
# ...
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,579
Members
45,053
Latest member
BrodieSola

Latest Threads

Top