Blocking queue race condition?

Discussion in 'C++' started by Andrew Tomazos, Jan 5, 2012.

  1. I'm trying to implement a high performance blocking queue backed by a
    circular buffer on top of pthreads, semaphore.h and gcc atomic
    builtins. The queue needs to handle multiple simulataneous readers
    and writers from different threads.

    I've isolated some sort of race condition, and I'm not sure if it's a
    faulty assumption about the behavior of some of the atomic operations
    and semaphores, or whether my design is fundamentally flawed.

    I've extracted and simplified it to the below standalone example. I
    would expect that this program never returns. It does however return
    after a few hundred thousand iterations with corruption detected in
    the queue (at least on my machine).

    In the below example (for exposition) it doesn't actually store
    anything, it just sets to 1 a cell that would hold the actual data,
    and 0 to represent an empty cell. There is a counting semaphore
    (vacancies) representing the number of vacant cells, and another
    counting semaphore (occupants) representing the number of occupied
    cells.

    Writers do the following:
    (1) decrement vacancies
    (2) atomically get next head position
    (3) write to it
    (4) increment occupants

    Readers do the opposite:
    (1) decrement occupants
    (2) atomically get next tail position
    (3) read from it
    (4) increment vacancies

    I would expect that given the above, precisely one thread can be
    reading or writing any given cell at one time. This seems to not be
    the case though.

    Any ideas about why it doesn't work or debugging strategies
    appreciated. Code and output below...

    #include <stdlib.h>
    #include <semaphore.h>
    #include <iostream>

    using namespace std;

    #define QUEUE_CAPACITY 8 // must be power of 2
    #define NUM_THREADS 2

    struct CountingSemaphore
    {
    sem_t m;
    CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
    void post() { sem_post(&m); }
    void wait() { sem_wait(&m); }
    ~CountingSemaphore() { sem_destroy(&m); }
    };

    struct BlockingQueue
    {
    unsigned int head; // (head % capacity) is next head position
    unsigned int tail; // (tail % capacity) is next tail position
    CountingSemaphore vacancies; // how many cells are vacant
    CountingSemaphore occupants; // how many cells are occupied

    int cell[QUEUE_CAPACITY]; // cell[x] == 1 if cell x occupied, cell[x]
    == 0 if cell x vacant

    BlockingQueue() :
    head(0),
    tail(0),
    vacancies(QUEUE_CAPACITY),
    occupants(0)
    {
    for (size_t i = 0; i < QUEUE_CAPACITY; i++)
    cell = 0;
    }

    // put an item in the queue
    void put()
    {
    vacancies.wait();

    // __sync_fetch_and_add(&head,1) is an atomic post increment, ie head
    ++
    set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

    occupants.post();
    }

    // take an item from the queue
    void take()
    {
    occupants.wait();

    // __sync_fetch_and_add(&tail,1) is an atomic post increment, ie tail
    ++
    get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);

    vacancies.post();
    }

    // set cell i
    void set(unsigned int i)
    {
    // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    // swap 1 for 0 or die
    if (!__sync_bool_compare_and_swap(&cell, 0, 1))
    {
    corrupt("set", i);
    exit(-1);
    }
    }

    // get cell i
    void get(unsigned int i)
    {
    // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    // swap 0 for 1 or die
    if (!__sync_bool_compare_and_swap(&cell, 1, 0))
    {
    corrupt("get", i);
    exit(-1);
    }
    }

    // corruption detected
    void corrupt(const char* action, unsigned int i)
    {
    static CountingSemaphore sem(1);
    sem.wait();

    cerr << "corruption detected" << endl;
    cerr << "action = " << action << endl;
    cerr << "i = " << i << endl;
    cerr << "head = " << head << endl;
    cerr << "tail = " << tail << endl;

    for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
    cerr << "cell[" << j << "] = " << cell[j] << endl;
    }
    };

    BlockingQueue q;

    // keep posting to the queue forever
    void* Source(void*)
    {
    while (true)
    q.put();

    return 0;
    }

    // keep taking from the queue forever
    void* Sink(void*)
    {
    while (true)
    q.take();

    return 0;
    }

    int main()
    {
    pthread_t id;

    // start some pthreads to run Source function
    for (int i = 0; i < NUM_THREADS; i++)
    if (pthread_create(&id, NULL, &Source, 0))
    abort();

    // start some pthreads to run Sink function
    for (int i = 0; i < NUM_THREADS; i++)
    if (pthread_create(&id, NULL, &Sink, 0))
    abort();

    while (true);
    }

    Compile the above as follows:
    $ g++ -pthread AboveCode.cpp
    $ ./a.out

    The output is different every time, but here is one example:

    corruption detected
    action = get
    i = 6
    head = 122685
    tail = 122685
    cell[0] = 0
    cell[1] = 0
    cell[2] = 1
    cell[3] = 0
    cell[4] = 1
    cell[5] = 0
    cell[6] = 1
    cell[7] = 1

    My system is Ubuntu 11.10 on Intel Core 2:
    $ uname -a
    Linux 3.0.0-14-generic #23-Ubuntu SMP Mon Nov 21 20:28:43 UTC 2011
    x86_64 x86_64 x86_64 GNU/Linux
    $ cat /proc/cpuinfo | grep Intel
    model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
    $ g++ --version
    g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1

    Thanks,
    Andrew.
    Andrew Tomazos, Jan 5, 2012
    #1
    1. Advertising

  2. On Jan 5, 8:31 am, Paavo Helde <> wrote:
    > Andrew Tomazos <> wrote innews::
    >
    > > I'm trying to implement a high performance blocking queue backed by a
    > > circular buffer on top of pthreads, semaphore.h and gcc atomic
    > > builtins.  The queue needs to handle multiple simulataneous readers
    > > and writers from different threads.

    >
    > It appears you are using 4 different memory locations for synchronization
    > (head, tail, occupants, vacancies). However, updates to these locations
    > are not strictly synchronized with each other so most probably their
    > states will become inconsistent at some point. Moreover, there is a
    > possibility that using 4 memory locations for synchronization may
    > actually reduce the performance when compared to e.g. a simple mutex.


    What do you mean synchronized with each other? sem_wait, sem_get, and
    __sync_fetch_and_add compile to a LOCK XADD, so they have an implicit
    full memory barrier. gcc says this includes anything that is

    > > I've isolated some sort of race condition, and I'm not sure if it's a
    > > faulty assumption about the behavior of some of the atomic operations
    > > and semaphores, or whether my design is fundamentally flawed.
    > >      void put()
    > >      {
    > >           vacancies.wait();

    >
    > >           // __sync_fetch_and_add(&head,1) is an atomic post
    > >           increment, ie head
    > > ++
    > >           set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);

    >
    > >           occupants.post();
    > >      }

    >
    > Here is an example of race condition. You wait until there are some free
    > cells (vacancies.wait()), then proceed to write some data somewhere.
    > However, these two operations are not performed atomically, meaning that
    > other threads can meanwhile post any number of items so that the queue
    > becomes full again when you get to __sync_fetch_and_add and will thus
    > overwrite some already occupied slot. Or have I overlooked something?


    The initial value of vacancies is the capacity of the queue and the
    initial value of occupants is 0. One of these two semaphores is
    always decremented at the start of the function (either put or take)
    and the other is incremented at the end. They both have implicit full
    membars, so there values are globally visable after changing.
    Therefore in between (during execution of put or take) the total of
    (vacancies+occupants) can be at maximum (capacity-1).

    Further for every other thread concurrently inside put or take this
    maximum is decreased by 1, so we can see at maximum (capacity) threads
    are putting or taking a total of (capacity) cells. Because readers
    atomically read/increment the head count once entering the take
    function they must receive sequentially indexed cells. Same for
    writers.

    I'm sure there is a flaw in my reasoning (as the program doesn't
    work), but I think your statement that "other threads can meanwhile
    post any number of items" is false as they have to get past the
    counting semaphores.

    Regards,
    Andrew.
    Andrew Tomazos, Jan 5, 2012
    #2
    1. Advertising

  3. Andrew Tomazos

    Melissa Guest

    On Thu, 5 Jan 2012 00:15:00 -0800 (PST)
    Andrew Tomazos <> wrote:

    >
    > I'm sure there is a flaw in my reasoning (as the program doesn't
    > work), but I think your statement that "other threads can meanwhile
    > post any number of items" is false as they have to get past the
    > counting semaphores.
    >


    Poblem is that set and get operations are not serialized so
    order of sets and gets can be random.
    That means that set(1) can trigger get (0) while
    set (0) is not yet performed.
    Melissa, Jan 5, 2012
    #3
  4. On Jan 5, 6:05 pm, "io_x" <> wrote:
    > "Andrew Tomazos" <> ha scritto nel messaggionews:...
    >
    > > I'm trying to implement a high performance blocking queue backed by a

    >
    > the code not compile
    > it return 51 errors


    If you post the errors maybe I can tell you what the problem is.

    > i prefer my asm queue implementation that allow multithread
    > to all stl libray, exception handling, and <> definitinos
    > , const definition, all in one count
    >
    > but i like constructors and distructors of object of C++
    > and references too, name for function that compiler see
    > name function strcat() name of parameters i like them too
    >
    > but pheraps i'm too much stupid for understand full them
    > or not like that path


    The real implementation uses things like:

    typename aligned_storage<sizeof(T), alignment_of<T>::value>::type
    storage[capacity];

    and emplace and move constructors. I've just reduced it to this
    simple example for debugging the race condition.
    -Andrew.
    Andrew Tomazos, Jan 5, 2012
    #4
  5. "Andrew Tomazos" wrote in message
    news:...

    [...]

    > // set cell i
    > void set(unsigned int i)
    > {
    > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    > // swap 1 for 0 or die
    > if (!__sync_bool_compare_and_swap(&cell, 0, 1))
    > {
    > corrupt("set", i);
    > exit(-1);
    > }
    > }


    > // get cell i
    > void get(unsigned int i)
    > {
    > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    > // swap 0 for 1 or die
    > if (!__sync_bool_compare_and_swap(&cell, 1, 0))
    > {
    > corrupt("get", i);
    > exit(-1);
    > }
    > }


    Why would the simple failure condition of the CAS mean "everything" is
    corrupted? AFAICT, the state is simply not finished being fully produced
    yet...


    2 threads
    _________________________________________________
    thread 1 gets past vacancies
    thread 2 gets past vacancies
    thread 1 XADD head gets 0 index
    thread 2 XADD head gets 1 index
    thread 2 success CAS 0-to-1 in index[1]
    thread 2 incs occupants
    thread 2 gets past occupants [consumer now]
    thread 2 XADD tail gets 0 index
    thread 2 fails CAS 1-to-0 in index[0]; test failed ????
    thread 1 success CAS 0-to-1 in index[0]; humm...
    _________________________________________________



    You basically have to use CAS _loops_ here; wrt this design anyway....

    <GET>
    while (!__sync_bool_compare_and_swap(&cell, 1, 0))
    {
    CLEVER_BACKOFF(); // ;^)
    }

    and vise-versa


    This should solve the race condition.




    BTW, check these mpmc bounded queues out:

    http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue


    and a very simple and still experimental one from me:

    http://groups.google.com/group/comp.arch/msg/654a30cba0623bf4




    My algorithm implicitly blocks on queue-full/empty and cell-full/empty
    conditions and acts accordingly. Also, it's in dire need of a distributed
    conditional blocking mechanism. Eventcount's come to mind... ;^)

    You can compile the following crude example impl with recent MSVC++ for
    x86-32 and the pthread-win32 library:

    http://pastebin.com/ZVX7dL4g
    Chris M. Thomasson, Jan 7, 2012
    #5
  6. "Chris M. Thomasson" wrote in message
    news:pl4Oq.9195$...
    [...]
    "Andrew Tomazos" wrote in message
    [...]

    > You can compile the following crude example impl with recent MSVC++ for
    > x86-32 and the pthread-win32 library:


    FWIW, you can read the following for further context:

    http://groups.google.com/group/comp..._frm/thread/734f7c367f720005/a96e1cc6fcbe77c8

    http://groups.google.com/group/comp.programming.threads/browse_frm/thread/10ac3ad36c4d1069

    http://groups.google.com/group/comp.arch/browse_frm/thread/68b568c5d41d45a7
    (read all my posts in this thread...)
    Chris M. Thomasson, Jan 7, 2012
    #6
  7. On Jan 8, 12:13 am, "Chris M. Thomasson" <> wrote:
    > "Andrew Tomazos"  wrote in message
    >
    > news:...
    >
    > [...]
    >
    >
    >
    >
    >
    >
    >
    >
    >
    > > // set cell i
    > > void set(unsigned int i)
    > > {
    > > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    > > // swap 1 for 0 or die
    > > if (!__sync_bool_compare_and_swap(&cell, 0, 1))
    > > {
    > > corrupt("set", i);
    > > exit(-1);
    > > }
    > > }
    > > // get cell i
    > > void get(unsigned int i)
    > > {
    > > // __sync_bool_compare_and_swap is gcc's atomic compare-and-assign
    > > // swap 0 for 1 or die
    > > if (!__sync_bool_compare_and_swap(&cell, 1, 0))
    > > {
    > > corrupt("get", i);
    > > exit(-1);
    > > }
    > > }

    >
    > Why would the simple failure condition of the CAS mean "everything" is
    > corrupted? AFAICT, the state is simply not finished being fully produced
    > yet...


    The state of the cell in the demo is a "stub" to represent whether the
    content is constructed there or not (in the real implementation). If
    the design was correct (it isn't), normally instead of the CAS there
    would be a (guaranteed) uncontended constructor call of the value_type
    (stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).
    The CAS is just to catch corruption in the demo.
    -Andrew.
    Andrew Tomazos, Jan 8, 2012
    #7
  8. "Andrew Tomazos" <> wrote in message
    news:...
    On Jan 8, 12:13 am, "Chris M. Thomasson" <> wrote:
    > "Andrew Tomazos" wrote in message

    [...]

    > > Why would the simple failure condition of the CAS mean "everything" is
    > > corrupted? AFAICT, the state is simply not finished being fully produced
    > > yet...


    > The state of the cell in the demo is a "stub" to represent whether the
    > content is constructed there or not (in the real implementation). If
    > the design was correct (it isn't), normally instead of the CAS there
    > would be a (guaranteed) uncontended constructor call of the value_type
    > (stub 0 -> 1) or a destructor call of the value_type (stub 1 -> 0).


    Ugggg....


    > The CAS is just to catch corruption in the demo.


    Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
    a failure condition at all. You need to spin on the CAS simply because just
    incrementing the `occupants' does NOT mean that there is a guaranteed item
    in the "cell you expect". You have a race-condition that I laid out for you;
    it can trip the condition even with 2 threads:

    2 threads
    _________________________________________________
    thread 1 gets past vacancies
    thread 2 gets past vacancies
    thread 1 XADD head gets 0 index
    thread 2 XADD head gets 1 index
    thread 2 success CAS 0-to-1 in index[1]
    thread 2 incs occupants
    thread 2 gets past occupants [consumer now]
    thread 2 XADD tail gets 0 index
    thread 2 fails CAS 1-to-0 in index[0]; test failed ????
    thread 1 success CAS 0-to-1 in index[0]; humm...
    _________________________________________________


    Think about it for a moment.... Thread 2 needs to WAIT for thread 1 to
    finish it's CAS. If you don't do it, the queue is quite busted and totally
    unusable.

    It as simple as that.
    Chris M. Thomasson, Jan 8, 2012
    #8
  9. "Andrew Tomazos" <> wrote in message
    news:...
    > I'm trying to implement a high performance blocking queue backed by a
    > circular buffer on top of pthreads, semaphore.h and gcc atomic
    > builtins. The queue needs to handle multiple simulataneous readers
    > and writers from different threads.


    [...]

    "High Performance" queue? I am counting 1 sem_post, 1 sem_wait, 2 XADD's and
    1 CAS for a push operation, and the exact same overhead for a pop operation.
    BTW, those calls to `sem_post/wait' are not exactly fast and probably
    contain an atomic RMW.

    So, assuming that `sem_post/wait' have at least 1 atomic RMW each, I am
    counting:

    5 atomic RMW for push and 5 atomic RMW for pop... Memory barriers aside for
    a moment...

    Even in the presence of ZERO contention, how can this possibly perform
    better than a simple condvar/mutex setup?

    The condvar/mutex will be using far less atomic RMW and memory barriers.
    Chris M. Thomasson, Jan 8, 2012
    #9
  10. On Jan 8, 10:56 pm, "Chris M. Thomasson" <> wrote:
    > Sorry, but a failed CAS in this specific algorithm simply does NOT indicate
    > a failure condition at all.


    From my point of view it does, because I expected (incorrectly) that
    there would be no contention at that point in the code.

    I understand the race condition now and have since fixed it in the
    design.
    -Andrew.
    Andrew Tomazos, Jan 9, 2012
    #10
  11. On Jan 8, 11:02 pm, "Chris M. Thomasson" <> wrote:
    > So, assuming that `sem_post/wait' have at least 1 atomic RMW each, I am
    > counting:
    >
    > 5 atomic RMW for push and 5 atomic RMW for pop... Memory barriers aside for
    > a moment...


    The original (faulty) design had 3 atomics (CMPXCHG, XADD, CMPXCHG)
    for push and the same for pop in the uncontended case.

    > Even in the presence of ZERO contention, how can this possibly perform
    > better than a simple condvar/mutex setup?


    If by simple "condvar/mutex setup", you mean one giant lock for the
    whole queue, consider that the real queue may have a complex
    constructor and/or a large value type. A single lock forces
    unnecessary serialization of readers and writers. Ideally you would
    like readers and writers to have access to different cells in
    parallel.

    > The condvar/mutex will be using far less atomic RMW and memory barriers.


    sem_post and sem_wait are implemented with a CMPXCHG in the
    uncontended case on my target platform (x86_64 linux).
    -Andrew.
    Andrew Tomazos, Jan 9, 2012
    #11
    1. Advertising

Want to reply to this thread or ask your own question?

It takes just 2 minutes to sign up (and it's free!). Just click the sign up button to choose a username and then you can ask your own questions on the forum.
Similar Threads
  1. jimjim

    race condition question

    jimjim, Nov 1, 2003, in forum: Java
    Replies:
    6
    Views:
    412
    jimjim
    Nov 2, 2003
  2. Russell Warren

    Is Queue.Queue.queue.clear() thread-safe?

    Russell Warren, Jun 22, 2006, in forum: Python
    Replies:
    4
    Views:
    678
    Russell Warren
    Jun 27, 2006
  3. Replies:
    1
    Views:
    345
    Kevin Spencer
    Aug 7, 2006
  4. Replies:
    3
    Views:
    732
  5. Kris
    Replies:
    0
    Views:
    477
Loading...

Share This Page