Blocking queue race condition?

A

Andrew Tomazos

I'm trying to implement a high performance blocking queue backed by a
circular buffer on top of pthreads, semaphore.h and gcc atomic
builtins. The queue needs to handle multiple simulataneous readers
and writers from different threads.

I've isolated some sort of race condition, and I'm not sure if it's a
faulty assumption about the behavior of some of the atomic operations
and semaphores, or whether my design is fundamentally flawed.

I've extracted and simplified it to the below standalone example. I
would expect that this program never returns. It does however return
after a few hundred thousand iterations with corruption detected in
the queue (at least on my machine).

In the below example (for exposition) it doesn't actually store
anything, it just sets to 1 a cell that would hold the actual data,
and 0 to represent an empty cell. There is a counting semaphore
(vacancies) representing the number of vacant cells, and another
counting semaphore (occupants) representing the number of occupied
cells.

Writers do the following:
(1) decrement vacancies
(2) atomically get next head position
(3) write to it
(4) increment occupants

Readers do the opposite:
(1) decrement occupants
(2) atomically get next tail position
(3) read from it
(4) increment vacancies

I would expect that given the above, precisely one thread can be
reading or writing any given cell at one time. This seems to not be
the case though.

Any ideas about why it doesn't work or debugging strategies
appreciated. Code and output below...

#include <stdlib.h>
#include <semaphore.h>
#include <iostream>

using namespace std;

#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2

struct CountingSemaphore
{
sem_t m;
CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
void post() { sem_post(&m); }
void wait() { sem_wait(&m); }
~CountingSemaphore() { sem_destroy(&m); }
};

struct BlockingQueue
{
unsigned int head; // (head % capacity) is next head position
unsigned int tail; // (tail % capacity) is next tail position
CountingSemaphore vacancies; // how many cells are vacant
CountingSemaphore occupants; // how many cells are occupied

int cell[QUEUE_CAPACITY]; // cell[x] == 1 if cell x occupied, cell[x]
== 0 if cell x vacant

BlockingQueue() :
head(0),
tail(0),
vacancies(QUEUE_CAPACITY),
occupants(0)
{
for (size_t i = 0; i < QUEUE_CAPACITY; i++)
cell = 0;
}

// put an item in the queue
void put()
{
vacancies.wait();

// __sync_fetch_and_add(&head,1) is an atomic post increment, ie head
++
set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

occupants.post();
}

// take an item from the queue
void take()
{
occupants.wait();

// __sync_fetch_and_add(&tail,1) is an atomic post increment, ie tail
++
get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

vacancies.post();
}

// set cell i
void set(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 1 for 0 or die
if (!__sync_bool_compare_and_swap(&cell, 0, 1))
{
corrupt("set", i);
exit(-1);
}
}

// get cell i
void get(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 0 for 1 or die
if (!__sync_bool_compare_and_swap(&cell, 1, 0))
{
corrupt("get", i);
exit(-1);
}
}

// corruption detected
void corrupt(const char* action, unsigned int i)
{
static CountingSemaphore sem(1);
sem.wait();

cerr << "corruption detected" << endl;
cerr << "action = " << action << endl;
cerr << "i = " << i << endl;
cerr << "head = " << head << endl;
cerr << "tail = " << tail << endl;

for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
cerr << "cell[" << j << "] = " << cell[j] << endl;
}
};

BlockingQueue q;

// keep posting to the queue forever
void* Source(void*)
{
while (true)
q.put();

return 0;
}

// keep taking from the queue forever
void* Sink(void*)
{
while (true)
q.take();

return 0;
}

int main()
{
pthread_t id;

// start some pthreads to run Source function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Source, 0))
abort();

// start some pthreads to run Sink function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Sink, 0))
abort();

while (true);
}

Compile the above as follows:
$ g++ -pthread AboveCode.cpp
$ ./a.out

The output is different every time, but here is one example:

corruption detected
action = get
i = 6
head = 122685
tail = 122685
cell[0] = 0
cell[1] = 0
cell[2] = 1
cell[3] = 0
cell[4] = 1
cell[5] = 0
cell[6] = 1
cell[7] = 1

My system is Ubuntu 11.10 on Intel Core 2:
$ uname -a
Linux 3.0.0-14-generic #23-Ubuntu SMP Mon Nov 21 20:28:43 UTC 2011
x86_64 x86_64 x86_64 GNU/Linux
$ cat /proc/cpuinfo | grep Intel
model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
$ g++ --version
g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

Thanks,
Andrew.
 
A

Andrew Tomazos

It appears you are using 4 different memory locations for synchronization
(head, tail, occupants, vacancies). However, updates to these locations
are not strictly synchronized with each other so most probably their
states will become inconsistent at some point. Moreover, there is a
possibility that using 4 memory locations for synchronization may
actually reduce the performance when compared to e.g. a simple mutex.

What do you mean synchronized with each other? sem_wait, sem_get, and
__sync_fetch_and_add compile to a LOCK XADD, so they have an implicit
full memory barrier. gcc says this includes anything that is
Here is an example of race condition. You wait until there are some free
cells (vacancies.wait()), then proceed to write some data somewhere.
However, these two operations are not performed atomically, meaning that
other threads can meanwhile post any number of items so that the queue
becomes full again when you get to __sync_fetch_and_add and will thus
overwrite some already occupied slot. Or have I overlooked something?

The initial value of vacancies is the capacity of the queue and the
initial value of occupants is 0. One of these two semaphores is
always decremented at the start of the function (either put or take)
and the other is incremented at the end. They both have implicit full
membars, so there values are globally visable after changing.
Therefore in between (during execution of put or take) the total of
(vacancies+occupants) can be at maximum (capacity-1).

Further for every other thread concurrently inside put or take this
maximum is decreased by 1, so we can see at maximum (capacity) threads
are putting or taking a total of (capacity) cells. Because readers
atomically read/increment the head count once entering the take
function they must receive sequentially indexed cells. Same for
writers.

I'm sure there is a flaw in my reasoning (as the program doesn't
work), but I think your statement that "other threads can meanwhile
post any number of items" is false as they have to get past the
counting semaphores.

Regards,
Andrew.
 
M

Melissa

I'm sure there is a flaw in my reasoning (as the program doesn't
work), but I think your statement that "other threads can meanwhile
post any number of items" is false as they have to get past the
counting semaphores.

Poblem is that set and get operations are not serialized so
order of sets and gets can be random.
That means that set(1) can trigger get (0) while
set (0) is not yet performed.
 
A

Andrew Tomazos

the code not compile
it return 51 errors

If you post the errors maybe I can tell you what the problem is.
i prefer my asm queue implementation that allow multithread
to all stl libray, exception handling, and <> definitinos
, const definition, all in one count

but i like constructors and distructors of object of C++
and references too, name for function that compiler see
name function strcat() name of parameters i like them too

but pheraps i'm too much stupid for understand full them
or not like that path

The real implementation uses things like:

typename aligned_storage<sizeof(T), alignment_of<T>::value>::type
storage[capacity];

and emplace and move constructors. I've just reduced it to this
simple example for debugging the race condition.
-Andrew.
 
C

Chris M. Thomasson

"Andrew Tomazos" wrote in message

[...]
// set cell i
void set(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 1 for 0 or die
if (!__sync_bool_compare_and_swap(&cell, 0, 1))
{
corrupt("set", i);
exit(-1);
}
}

// get cell i
void get(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 0 for 1 or die
if (!__sync_bool_compare_and_swap(&cell, 1, 0))
{
corrupt("get", i);
exit(-1);
}
}


Why would the simple failure condition of the CAS mean "everything" is
corrupted? AFAICT, the state is simply not finished being fully produced
yet...


2 threads
_________________________________________________
thread 1 gets past vacancies
thread 2 gets past vacancies
thread 1 XADD head gets 0 index
thread 2 XADD head gets 1 index
thread 2 success CAS 0-to-1 in index[1]
thread 2 incs occupants
thread 2 gets past occupants [consumer now]
thread 2 XADD tail gets 0 index
thread 2 fails CAS 1-to-0 in index[0]; test failed ????
thread 1 success CAS 0-to-1 in index[0]; humm...
_________________________________________________



You basically have to use CAS _loops_ here; wrt this design anyway....

<GET>
while (!__sync_bool_compare_and_swap(&cell, 1, 0))
{
CLEVER_BACKOFF(); // ;^)
}

and vise-versa


This should solve the race condition.




BTW, check these mpmc bounded queues out:

http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue


and a very simple and still experimental one from me:

http://groups.google.com/group/comp.arch/msg/654a30cba0623bf4




My algorithm implicitly blocks on queue-full/empty and cell-full/empty
conditions and acts accordingly. Also, it's in dire need of a distributed
conditional blocking mechanism. Eventcount's come to mind... ;^)

You can compile the following crude example impl with recent MSVC++ for
x86-32 and the pthread-win32 library:

http://pastebin.com/ZVX7dL4g
 
C

Chris M. Thomasson

"Chris M. Thomasson" wrote in message
[...]
"Andrew Tomazos" wrote in message
[...]
You can compile the following crude example impl with recent MSVC++ for
x86-32 and the pthread-win32 library:

FWIW, you can read the following for further context:

http://groups.google.com/group/comp..._frm/thread/734f7c367f720005/a96e1cc6fcbe77c8

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/10ac3ad36c4d1069

http://groups.google.com/group/comp.arch/browse_frm/thread/68b568c5d41d45a7
(read all my posts in this thread...)
 
A

Andrew Tomazos

"Andrew Tomazos"  wrote in message


[...]








// set cell i
void set(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 1 for 0 or die
if (!__sync_bool_compare_and_swap(&cell, 0, 1))
{
corrupt("set", i);
exit(-1);
}
}
// get cell i
void get(unsigned int i)
{
// __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
// swap 0 for 1 or die
if (!__sync_bool_compare_and_swap(&cell, 1, 0))
{
corrupt("get", i);
exit(-1);
}
}


Why would the simple failure condition of the CAS mean "everything" is
corrupted? AFAICT, the state is simply not finished being fully produced
yet...


The state of the cell in the demo is a "stub" to represent whether the
content is constructed there or not (in the real implementation). If
the design was correct (it isn't), normally instead of the CAS there
would be a (guaranteed) uncontended constructor call of the value_type
(stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).
The CAS is just to catch corruption in the demo.
-Andrew.
 
C

Chris M. Thomasson

"Andrew Tomazos" wrote in message [...]
Why would the simple failure condition of the CAS mean "everything" is
corrupted? AFAICT, the state is simply not finished being fully produced
yet...
The state of the cell in the demo is a "stub" to represent whether the
content is constructed there or not (in the real implementation). If
the design was correct (it isn't), normally instead of the CAS there
would be a (guaranteed) uncontended constructor call of the value_type
(stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).
Ugggg....


The CAS is just to catch corruption in the demo.

Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
a failure condition at all. You need to spin on the CAS simply because just
incrementing the `occupants' does NOT mean that there is a guaranteed item
in the "cell you expect". You have a race-condition that I laid out for you;
it can trip the condition even with 2 threads:

2 threads
_________________________________________________
thread 1 gets past vacancies
thread 2 gets past vacancies
thread 1 XADD head gets 0 index
thread 2 XADD head gets 1 index
thread 2 success CAS 0-to-1 in index[1]
thread 2 incs occupants
thread 2 gets past occupants [consumer now]
thread 2 XADD tail gets 0 index
thread 2 fails CAS 1-to-0 in index[0]; test failed ????
thread 1 success CAS 0-to-1 in index[0]; humm...
_________________________________________________


Think about it for a moment.... Thread 2 needs to WAIT for thread 1 to
finish it's CAS. If you don't do it, the queue is quite busted and totally
unusable.

It as simple as that.
 
C

Chris M. Thomasson

Andrew Tomazos said:
I'm trying to implement a high performance blocking queue backed by a
circular buffer on top of pthreads, semaphore.h and gcc atomic
builtins. The queue needs to handle multiple simulataneous readers
and writers from different threads.

[...]

"High Performance" queue? I am counting 1 sem_post, 1 sem_wait, 2 XADD's and
1 CAS for a push operation, and the exact same overhead for a pop operation.
BTW, those calls to `sem_post/wait' are not exactly fast and probably
contain an atomic RMW.

So, assuming that `sem_post/wait' have at least 1 atomic RMW each, I am
counting:

5 atomic RMW for push and 5 atomic RMW for pop... Memory barriers aside for
a moment...

Even in the presence of ZERO contention, how can this possibly perform
better than a simple condvar/mutex setup?

The condvar/mutex will be using far less atomic RMW and memory barriers.
 
A

Andrew Tomazos

Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
a failure condition at all.

From my point of view it does, because I expected (incorrectly) that
there would be no contention at that point in the code.

I understand the race condition now and have since fixed it in the
design.
-Andrew.
 
A

Andrew Tomazos

So, assuming that `sem_post/wait' have at least 1 atomic RMW each, I am
counting:

5 atomic RMW for push and 5 atomic RMW for pop... Memory barriers aside for
a moment...

The original (faulty) design had 3 atomics (CMPXCHG, XADD, CMPXCHG)
for push and the same for pop in the uncontended case.
Even in the presence of ZERO contention, how can this possibly perform
better than a simple condvar/mutex setup?

If by simple "condvar/mutex setup", you mean one giant lock for the
whole queue, consider that the real queue may have a complex
constructor and/or a large value type. A single lock forces
unnecessary serialization of readers and writers. Ideally you would
like readers and writers to have access to different cells in
parallel.
The condvar/mutex will be using far less atomic RMW and memory barriers.

sem_post and sem_wait are implemented with a CMPXCHG in the
uncontended case on my target platform (x86_64 linux).
-Andrew.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,007
Latest member
obedient dusk

Latest Threads

Top