Fast Semaphore

J

Joe Seigh

Here's a port of a fast pathed semaphore I did elsewhere. It
only does single permit acquire and release. If you use it
in conjunction with ConcurrentLinkedQueue you can get a blocking
queuue that's up to 3X faster than LinkedBlockingQueue under
contention.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class FastSemaphore {

AtomicInteger count; // semaphore count
AtomicInteger cancel; // deferred cancelation count
Semaphore sem; // slow semaphore

public FastSemaphore(int z, boolean fair) {
count = new AtomicInteger(z);
cancel = new AtomicInteger(0);
sem = new Semaphore(0, fair);
}

/*
* processCancels - add cancelCount to current count
*
* increment count by min(cancelCount, -(count)) iff count < 0
*/
void processCancels(int cancelCount) {
int oldCount;
int newCount;

if (cancelCount > 0) {
while ((oldCount = count.get()) < 0) {
if ((newCount = oldCount + cancelCount) > 0)
newCount = 0;

if (count.compareAndSet(oldCount, newCount)) {
cancelCount -= (newCount - oldCount); // update cancelCount
break;
}
}
}

// add any untransferred cancelCount back into cancel
if (cancelCount > 0) {
cancel.addAndGet(cancelCount);
}
}

public void acquire()
throws InterruptedException
{
if (count.addAndGet(-1) < 0) {
try {
sem.acquire();
}

catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}
}
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException
{
boolean rc;
if (count.addAndGet(-1) < 0) {
try {
rc = sem.tryAcquire(timeout, unit);
}

catch (InterruptedException e) {
// uncomment one and only one of the following 2 statements
cancel.incrementAndGet();
// processCancels(1);
throw e;
}

if (rc == false) {
cancel.incrementAndGet();
// processCancels(1);
}
return rc;
}

else
return true;
}

public boolean tryAcquire() {
int oldCount;

do {
oldCount = count.get();
}
while (oldCount > 0 && !count.compareAndSet(oldCount, (oldCount - 1)));

return (oldCount > 0);

}

public void release() {
if (cancel.get() > 0 && count.get() < 0) {
processCancels(cancel.getAndSet(0));
}

if (count.addAndGet(1) <= 0) {
sem.release();
}
}

}

/*-*/
 
N

neuneudr

Here's a port of a fast pathed semaphore I did elsewhere. It
only does single permit acquire and release.

Hi,

you obviously know quite a lot about multi-threaded programming.
So I've got a question for you (I'm confused).

I saw that the Semaphore class got added to Java 1.5 by *the*
Java concurrency guru (Doug Lea).

On Wikipedia I find this:

"Semaphores remain in common use in programming languages that
"do not intrinsically support other forms of synchronization.

I've discovered multi-threaded programming with Java and
I try to do synchronization correctly (and I use the
java.util.concurrent package quite often).

So my question is simple: what do semaphores (and fast
semaphores) bring to Java that wasn't available before?

IOW, what can Java we do now in Java 1.5 that we couldn't
do in Java 1.4?

Is it "simply" efficiency (which of course is very important),
or are there other benefits that are also very important?

Is it possible to write correct multi-threaded Java programs
without using semaphores?

Thanks in advance for any infos,

Driss
 
D

Daniel Pitts

Here's a port of a fast pathed semaphore I did elsewhere. It
only does single permit acquire and release. If you use it
in conjunction with ConcurrentLinkedQueue you can get a blocking
queuue that's up to 3X faster than LinkedBlockingQueue under
contention.
What benchmarks did you use? Code?
 
J

Joe Seigh

Here's a port of a fast pathed semaphore I did elsewhere. It
only does single permit acquire and release.
[...]
So my question is simple: what do semaphores (and fast
semaphores) bring to Java that wasn't available before?


Semaphores are just another synchronization primative. They're
not as widely applicable as locks and condition variables but
when you do run into a situation where they're useful, they're
nice to have. Semaphores are useful for limiting the number of
threads accessing a resource. They work well for producer/consumer
sometimes although they don't address thread safety of the queue
itself. Other synchronization primatives are barriers (already in
1.5) and eventcounts which aren't in Java.
IOW, what can Java we do now in Java 1.5 that we couldn't
do in Java 1.4?

Nothing, but it's more of a question of efficiency and being
able to use more "natural" design patterns associated with the
synchronization primative in question.
Is it "simply" efficiency (which of course is very important),
or are there other benefits that are also very important?

Is it possible to write correct multi-threaded Java programs
without using semaphores?

It better be. :)
 
J

Joe Seigh

Robert said:
I'm curious, too, how it compares to j.u.c.Semaphore.

I'm rewriting the testcase to make it a bit more compact. It was a bit
large for posting.

The contention is set artifically* high, a bunch of threads doing queue
operations on a queue that's non empty most of the time.

* for normal usage unless you were doing a high throuput server or something
like that.
 
J

Joe Seigh

Robert said:
I'm curious, too, how it compares to j.u.c.Semaphore.

Ok. I have a new design pattern that subsets interfaces so
I can compare apple to orange implementations without having
to implement the fruit, plant, multi-celled organism, cell,
dna, organic molecule, molecule, and atom interfaces for
orange. Java's architects really have nothing better to do
with other people's time.



import java.util.LinkedList;
import java.util.concurrent.*;
import java.util.*;

interface Sem {
public void acquire() throws InterruptedException;
public void release();
}

class NSem extends Semaphore implements Sem {
public NSem(int n, boolean fair) {super(n, fair); }
}

class FSem extends FastSemaphore implements Sem {
public FSem(int n, boolean fair) { super(n, fair); }
}

interface fifo<T> {
public void queue(T o);
public T dequeue() throws InterruptedException;
}

class ConcurrentFifoQueue<T>
implements fifo<T>
{
ConcurrentLinkedQueue<T> queue;
Sem sem;

ConcurrentFifoQueue(Sem s) {
queue = new ConcurrentLinkedQueue<T>();
sem = s;
}
public void queue(T o) {
queue.offer(o);
sem.release();
}
public T dequeue() throws InterruptedException {
sem.acquire();
return queue.poll();
}
}


class BlockingFifoQueue<T>
extends java.util.concurrent.LinkedBlockingQueue<T>
implements fifo<T>
{
public void queue(T o) { try {put(o); } catch (InterruptedException e) {} }
public T dequeue() throws InterruptedException { return take(); }
}

public class qtest {

/**
* multi-threaded queueing test
*
*/
final static int loopcount = 20000;
final static int nodecount = 200;
final static int threadcount = 20;
final static Formatter fmt = new java.util.Formatter(System.out);

public void test(final fifo<Object> fullq, final fifo<Object> emptyq, String desc) {
int j;
long t0, t1; // start, stop times
Thread producer[] = new Thread[threadcount];
Thread consumer[] = new Thread[threadcount];
final CyclicBarrier barrier = new CyclicBarrier(producer.length + consumer.length + 1);

for (j = 0; j < nodecount; j++) {
emptyq.queue(new Object());
}

for (j = 0; j < producer.length; j++) {
producer[j] = new Thread(new Runnable() {
public void run() {
try {barrier.await(); } catch (Exception e) {}
for (int j = 0; j < loopcount; j++) {
try {fullq.queue(emptyq.dequeue()); } catch (InterruptedException e) {}
}
try {barrier.await(); } catch (Exception e) {}
}
});
producer[j].setDaemon(true);
producer[j].start();
}

for (j = 0; j < consumer.length; j++) {
consumer[j] = new Thread(new Runnable() {
public void run() {
try {barrier.await(); } catch (Exception e) {}
for (int j = 0; j < loopcount; j++) {
try {emptyq.queue(fullq.dequeue()); } catch (InterruptedException e) {}
}
try {barrier.await(); } catch (Exception e) {}
}
});
consumer[j].setDaemon(true);
consumer[j].start();
}

try {barrier.await(); } catch (Exception e) {}
t0 = System.nanoTime();

try {barrier.await(); } catch (Exception e) {}
t1 = System.nanoTime();
double x = ((t1 - t0)/1e9);
System.out.println(desc + ":");
fmt.format("runtime = %12.9f secs\n\n", x);
}



public static void main(String[] args) {


qtest q = new qtest();

fmt.format("loop count = %6d\n", loopcount);
fmt.format("queue size = %6d\n", nodecount);
fmt.format("producer count = %6d\n", threadcount);
fmt.format("consumer count = %6d\n\n", threadcount);


q.test(
new BlockingFifoQueue<Object>(),
new BlockingFifoQueue<Object>(),
"LinkedBlockingQueue");

q.test(
new ConcurrentFifoQueue<Object>(new NSem(0, false)),
new ConcurrentFifoQueue<Object>(new NSem(0, false)),
"ConcurrentLinkedQueue w/ unfair semaphore");

q.test(
new ConcurrentFifoQueue<Object>(new NSem(0, true)),
new ConcurrentFifoQueue<Object>(new NSem(0, true)),
"ConcurrentLinkedQueue w/ fair semaphore");

q.test(
new ConcurrentFifoQueue<Object>(new FSem(0, false)),
new ConcurrentFifoQueue<Object>(new FSem(0, false)),
"ConcurrentLinkedQueue w/ unfair fast semaphore");

q.test(
new ConcurrentFifoQueue<Object>(new FSem(0, true)),
new ConcurrentFifoQueue<Object>(new FSem(0, true)),
"ConcurrentLinkedQueue w/ fair fast semaphore");

System.out.println("qtest exiting...");

}

}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,015
Latest member
AmbrosePal

Latest Threads

Top