Multi-threading: wait for tasks to complete

M

markspace

Hi all.

I was toying around with some multithreading code today. I ran into a
stick problem: how to wait for an unknown number of tasks to complete.

There seem to be a lot of Java classes that wait for a specific number
of threads or tasks: Semaphore and CountDownLatch, for example. But
there don't seem to be any that allow their value to be changed on the
fly to account for new tasks being created.

Maybe I missed an existing class?

Anyway, the solution I came up with was to roll my own latch, the
UpDownLatch. So named because it can count both up (for new tasks being
spawned) and down (for when the task completes).

Here's the code. Comments welcome. Obviously, it's currently a nested
class; that should be changed for general use.


private static class UpDownLatch
{
private int count;

public synchronized void countUp() {
count++;
}
public synchronized void countDown() {
count--;
if( count == 0 ) {
notifyAll();
}
}
public synchronized void await() throws InterruptedException {
while( count != 0 ) {
wait();
}
}
}
 
L

Lew

Peter said:
public void countDown()
{
if (count.decrementAndGet() > 0)
{
return;
}

// Might want to save the above return value
// and throw an exception if "count" is less
// than 0.

synchronized(objLock)
{
objLock.notify();
}
}

You have a race condition here. Another thread could 'countUp()' or down
between the decrement-check and the synchronized notify. As Brian Goetz
explains in /Java Concurrency in Practice/, when you have state that depends
on more than one variable, the involved variables have to be synched under the
same lock.
 
M

markspace

Peter said:
While performance might not be an issue in all cases, I still would
probably have implemented your class with AtomicInteger, instead of
synchronized methods for countUp() and countDown(). Then you need only
synchronize when you actually need to notify. (The compare-and-set the
atomic classes implement aren't free of performance costs either, but
should generally perform better than a full lock).


Thanks for the feedback. I was hoping that synchronized methods in
UpDownLatch could be optimized to spin-locks at runtime, thus saving the
overhead of either a full lock or using AtomicInteger. However, I
haven't profiled it yet so I'm not sure that the synchronized methods
are better. It's certainly worthy of investigation.

I'd probably modify your idea as follows to avoid the explicit lock
object; the AtomicInteger can serve the same purpose (code is untested):

private static class UpDownLatch2
{
private AtomicInteger count = new AtomicInteger();
public void countUp() {
count.incrementAndGet();
}
public void countDown() {
synchronized( count ) {
if( count.decrementAndGet() == 0 ) {
count.notifyAll();
}
}
}
public void await() throws InterruptedException {
synchronized( count ) {
while( count.get() != 0 ) {
count.wait();
}
}
}
}
 
K

Kevin McMurtrie

markspace said:
Hi all.

I was toying around with some multithreading code today. I ran into a
stick problem: how to wait for an unknown number of tasks to complete.

There seem to be a lot of Java classes that wait for a specific number
of threads or tasks: Semaphore and CountDownLatch, for example. But
there don't seem to be any that allow their value to be changed on the
fly to account for new tasks being created.

Maybe I missed an existing class?

Anyway, the solution I came up with was to roll my own latch, the
UpDownLatch. So named because it can count both up (for new tasks being
spawned) and down (for when the task completes).

Here's the code. Comments welcome. Obviously, it's currently a nested
class; that should be changed for general use.


private static class UpDownLatch
{
private int count;

public synchronized void countUp() {
count++;
}
public synchronized void countDown() {
count--;
if( count == 0 ) {
notifyAll();
}
}
public synchronized void await() throws InterruptedException {
while( count != 0 ) {
wait();
}
}
}

That code is fine. The use can be tricky.

Any thread starting another thread must issue countUp() for that new
thread immediately, or at least before calling countDown() for itself.
Thread.start() can cause run() to execute before start() completes, or
maybe run() happens later. It varies by OS and JVM version.

When looking at the implementation, it appears that it might be best to
move the synchronization. You want to make sure that the threads and
counter always match, even if there are code mistakes. If you're 100%
sure that Thread.start() will never throw, it's safe to take it out of
the synchronized block.


public void start ()
{
synchronized(latch)//Block zero test check until after countUp()
{
super.start(); //May throw IllegalThreadStateException
latch.countUp(); //Only here if it worked
}
}
public void run ()
{
try
{
...code or super.run()
}
finally
{
synchronized(latch)
{
latch.countDown(); //Do this before thread exits.
}
}
}
 
M

markspace

Kevin said:
Thread.start() can cause run() to execute before start() completes, or
maybe run() happens later. It varies by OS and JVM version.

This is a good point. I had already accounted for that. The parent
thread calls countUp() before dispatching a child thread. The child
thread calls countDown() when it terminates.

Here's part of the algorithm. It's a multi-threaded quick sort, inspire
by the previous conversation about multi-threading sorts.


public class NonRecursiveMultiThreadedQuickSort
extends Sort
{

....
final ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() );
....

private <T extends Comparable<? super T>> void quicksort( T[] a,
int l,
int r,
UpDownLatch counter )
{

while( l < r ) {
if( r - l <= 8 ) {
insertionSort( a, l, r );
break;
}
int i = partition( a, l, r );
if( i - l > r - i ) {
if( l < i - 1 ) {
counter.countUp();
SortTask task = new SortTask<T>( a, l, i - 1, counter );
Callable<?> call = (Callable<?>) task;
executor.submit( call );
}
l = i + 1;
} else {
if( i + 1 < r ) {
counter.countUp();
SortTask task = new SortTask<T>( a, i + 1, r, counter );
Callable<?> call = (Callable<?>) task;
executor.submit( call );
}
r = i - 1;
}
}
}
....
}

It still needs a bit of work, but that's the basic algorithm right now.
You can see that counter.countUp() is called before I dispatch the
sub-task to the executor service. There's no chance of a synchronization
problem.

public void run ()
{
try
{
...code or super.run()
}
finally
{
synchronized(latch)
{
latch.countDown(); //Do this before thread exits.
}
}
}

This bit here is a better point. I've tested the heck out of my code,
but in the field someone might pass it an array with null references or
a bad compareTo() method. So the finally statement is a good idea,
perhaps a necessary one. Thanks for pointing that out.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,768
Messages
2,569,574
Members
45,051
Latest member
CarleyMcCr

Latest Threads

Top