Why is java consumer/producer so much faster than C++

B

Bo Persson

Luca Risolia skrev 2012-07-24 00:33:
In C++11 the ideal interface for a generic thread-safe and
exception-safe queue should at least provide std::shared_ptr<T> take()
and/or void take(T&). T take() is limited to those types having no-throw
copy constructors or move-constructors. This also means that you should
check the type at compile-time by using type traits. However, in your
test case you used move-semantic everywhere, so that is not problem.
Also note that the term "pop" instead of "take" is more in the spirit of
C++.

Using move semantics in a return statement might interfere with the NRVO
that would otherwise likely kick in. It is not an optimization.


Bo Persson
 
M

Melzzzzz

At second glance I wasn't that fond of my solution and tweaked it as
below:


void put(T t)
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
while (!lock.owns_lock())
{
if (queue_.size() > capacity_/4)
{

There is problem, as queue_ is not protected under lock.
for (int i = 0; i < 3250; ++i)
std::this_thread::yield();
lock.try_lock();
}
else
{
lock.lock();
break;
}
}
while(queue_.size() >= capacity_)c_full_.wait(lock);
queue_.push_back(std::move(t));
if (queue_.size() == 1)
c_empty_.notify_one();
}
T take()
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (queue_.size() < 3*capacity_/4)
{
for (int i = 0; i < 3250; ++i)
std::this_thread::yield();
lock.try_lock();
}
else
{
lock.lock();
break;
}
}
while(queue_.empty())c_empty_.wait(lock);
T tmp = std::move(queue_.front());
queue_.pop_front();
if (queue_.size() == capacity_-1)
c_full_.notify_one();
return tmp;
}

I am admittedly coding to the benchmark (which is not a really great
idea). But I got the time on my system down from 13.7 seconds to
about 9.7 seconds (another 40%).


This is version with array (identical implementation
as ArrayBlockingQueue in Java), instead of deque (I
used atomic type for count_). This executes about same speed as Java
as atomic type slows it down a bit.
Sadly, no noticing difference between deque and array.

#include <condition_variable>
#include <mutex>
#include <thread>
#include <cstdlib>
#include <atomic>

template <class T>
class BlockingQueue{
public:
BlockingQueue(unsigned cap)
:
queue_((T*)new char[cap*sizeof(T)]),
insPos_(0),
extPos_(0),
count_(0),
capacity_(cap)
{
}
BlockingQueue(const BlockingQueue&)=delete;
BlockingQueue& operator=(const BlockingQueue&)=delete;
~BlockingQueue()
{
delete[] (char*)queue_;
}
void put(T t)
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (count_ > capacity_/4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
lock.lock();
}
}
c_full_.wait(lock, [this]{return count_ < capacity_;});
new(queue_+insPos_)T(std::move(t));
inc(insPos_);
++count_;
if (count_ == 1)
c_empty_.notify_one();
}
T take()
{
std::unique_lock<std::mutex> lock(m_, std::try_to_lock);
int retry = 0;
while (!lock.owns_lock())
{
if (count_ < 3*capacity_/4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
lock.lock();
}
}
c_empty_.wait(lock, [this]{return count_ > 0 ;});
T tmp = std::move(queue_[extPos_]);
queue_[extPos_].~T();
inc(extPos_);
--count_;
if (count_ == capacity_-1)
c_full_.notify_one();
return tmp;
}
bool empty()
{
std::lock_guard<std::mutex> lock(m_);
return count_ == 0;
}
private:
void inc(unsigned& i)
{
++i;
if(i == capacity_)i = 0;
}
std::mutex m_;
std::condition_variable c_empty_,c_full_;
T* queue_;
unsigned insPos_,extPos_;
std::atomic<unsigned> count_;
const unsigned capacity_;
};

int main()
{
BlockingQueue<int> produced(1000);
const int nitems = 100000000;
std::srand(12345);


std::function<void()> f_prod = [&]() {
int i = nitems;
while(i-- > 0){
produced.put(i);
}
};

std::thread producer1(f_prod);

std::function<void()> f_cons = [&]() {
const int size = 10000;
int arr[size];
int i = nitems;
while(i-- > 0)
{
arr[std::rand()%size] =
produced.take();
}
};

std::thread consumer1(f_cons);

producer1.join();
consumer1.join();
}
 
L

Luca Risolia

This is version with array (identical implementation
as ArrayBlockingQueue in Java), instead of deque (I
used atomic type for count_).

Why do you need count_ to be an atomic type, it seems that all the
updates to that variable are already protected by a mutex..
 
M

Melzzzzz

Why do you need count_ to be an atomic type, it seems that all the
updates to that variable are already protected by a mutex..

count_ is not protected when testing if lock is held in while loop.
Anyway this has not be precise and if count_ is not atomic program
is much faster.
I have tried Howard's algorithm with Java and got same
time as C++.
Perhaps Array/LinkedBlockingQueue implementation in Java has
to be updated ;)
 
L

Luca Risolia

count_ is not protected when testing if lock is held in while loop.

Correct, I did not read the code carefully, since I supposed the only
difference from your original version was the use of an array, which
made the program about 20-30% faster in my tests.
 
H

Howard Hinnant

&gt; void put(T t)
&gt; {
&gt; std::unique_lock&lt;std::mutex&gt; lock(m_, std::try_to_lock);
&gt; while (!lock.owns_lock())
&gt; {
&gt; if (queue_.size() &gt; capacity_/4)
&gt; {

There is problem, as queue_ is not protected under lock.

Correct. But the code gets away with it anyway because:

1. It does not modify the queue.
2. It does not need an accurate value for queue_.size().

The *very* worst that could happen is that the value of queue_.size() read by one thread is never updated by the other as the other thread fills/empties the queue. Eventually the other thread will drop the mutex ownership, either because it runs out of things to put into the queue, or because it fills the queue and has to wait, or because it empties the queue and has to wait. In any event, the other thread drops the lock and this thread acquires it via the try_lock. It has not mattered, except perhaps for efficiency purposes, that this thread has been reading the wrong value of queue_.size().

And as you pointed out, efficiency hasn't suffered.
 
N

none

Problem is that we need to reduce waiting on condition to minimum
as more condition waits program is slower.
Ideally there would be no condition waits (maximum speed).

OK, for this test program, I guess it might make sense (albeit in my
case, this worsens the results)

However, you should be careful to generalizing this to a "real"
program. This test program passes integers back and forth via the
queue and do no "work" at all. A "real" program will most likely do
work.

Typically for a real program, fcons/fprod will look more like:

void fprod()
{
for(;;)
{
Job job = CreateJob();
queue.put(job);
}
}

void fcons()
{
for(;;)
{
Job job = queue.take();
ProcessJob(Job);
}
}

And the cost of ProcessJob()/CreateJob() will dwarf the cost of
put()/take() by several order of magnitude. (in your test program, the
cost of storing the integer is dwarfed by the cost of accessing the
queue)

For such a system, you actually want it to be a common occurence that
the consumer blocks waiting on a condition.
 
H

Howard Hinnant

However, you should be careful to generalizing this to a &quot;real&quot;
program.

100% agreed.

I wrote earlier:
I am admittedly coding to the benchmark (which is not a really great idea).

It was fun. I had a good time. :)

Howard
 
L

Luca Risolia

This is version with array (identical implementation
as ArrayBlockingQueue in Java), instead of deque (I
used atomic type for count_). This executes about same speed as Java
as atomic type slows it down a bit.
Sadly, no noticing difference between deque and array.

Right, it may well depend on how smart the std::deque::allocator_type is.

Below is a version of the BlockingQueue using spin locks. Compared to
the original version in Java it is about 2.5 times faster on my
platform, provided that you remove std::rand() from the test, as it
seems to be a real bottleneck and does not give any precious
informations about the efficiency of the Queue (which is the thing to
measure after all).

Let me know your results.

---

#include <thread>
#include <cstdlib>
#include <functional>
#include <atomic>
#include <memory>
#include <type_traits>

class spinlock_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:

void lock() {
while (flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
};
}

void unlock() {
flag.clear(std::memory_order_release);
}
};

template <class T, class A = std::allocator<T >>
class BlockingQueue {
const std::size_t cap_;
A alloc_;
T* queue_;
std::size_t n = 0, b = 0, f = 0;
spinlock_mutex m_;
public:

BlockingQueue(const std::size_t cap, const A& alloc = A())
: cap_(cap), alloc_(alloc), queue_(alloc_.allocate(cap)) { }

~BlockingQueue() {
alloc_.deallocate(queue_, cap_);
}

static_assert(std::is_nothrow_move_constructible<T>::value,
"BlockingQueue cannot be exception-safe");

void put(const T t) {
for (;;) {
m_.lock();
if (n < cap_)
break;
m_.unlock();
}
alloc_.construct(&queue_[f], std::move(t));
f = ++f % cap_;
++n;
m_.unlock();
}

T take() {
for (;;) {
m_.lock();
if (n > 0)
break;
m_.unlock();
}
const T t = std::move(queue_);
alloc_.destroy(&queue_);
b = ++b % cap_;
--n;
m_.unlock();
return t;
}
};

int main() {
BlockingQueue<int> produced(100000);
const int nitems = 100000000;
std::srand(12345);


std::function<void() > f_prod = [&](){
int i = nitems;
while (i-- > 0) {
produced.put(i);
}
};

std::thread producer1(f_prod);

std::function<void() > f_cons = [&](){
const int size = 10000;
int arr[size];
int i = nitems;
while (i-- > 0) {
arr[std::rand() % size] = produced.take();
}
};

std::thread consumer1(f_cons);

producer1.join();
consumer1.join();
}
 
M

Melzzzzz

Right, it may well depend on how smart the std::deque::allocator_type
is.

Below is a version of the BlockingQueue using spin locks. Compared to
the original version in Java it is about 2.5 times faster on my
platform, provided that you remove std::rand() from the test, as it
seems to be a real bottleneck and does not give any precious
informations about the efficiency of the Queue (which is the thing to
measure after all).

Let me know your results.
(your version without rand and array assignment)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m6.812s
user 0m7.532s
sys 0m5.136s
(Howards version without rand and array assignment)
bmaxa@maxa:~/examples$ time ./consprod4

real 0m6.748s
user 0m7.376s
sys 0m5.000s

(your version with rand and array assignment)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m23.733s
user 0m27.042s
sys 0m16.849s

(Howards version with rand and array assignment)
bmaxa@maxa:~/examples$ time ./consprod4

real 0m9.351s
user 0m10.257s
sys 0m6.900s

(java (Howards version) without rand and array assignment)
bmaxa@maxa:~/examples$ time java consprod

real 0m8.088s
user 0m8.841s
sys 0m5.596s

(java (Howards version) with rand and array assignment)
bmaxa@maxa:~/examples$ time java consprod

real 0m10.052s
user 0m11.117s
sys 0m7.208s

---

#include <thread>
#include <cstdlib>
#include <functional>
#include <atomic>
#include <memory>
#include <type_traits>

class spinlock_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;

gcc 4.6.3 doesn't implements non static member initialization ;(
static_assert(std::is_nothrow_move_constructible<T>::value,
"BlockingQueue cannot be exception-safe");

this is not yet in library ;(
void put(const T t) {
for (;;) {
m_.lock();
if (n < cap_)
break;
m_.unlock();
}
alloc_.construct(&queue_[f], std::move(t));
f = ++f % cap_;

compiler warns about undefined behavior here.
++n;
m_.unlock();
}

T take() {
for (;;) {
m_.lock();
if (n > 0)
break;
m_.unlock();
}
const T t = std::move(queue_);
alloc_.destroy(&queue_);
b = ++b % cap_;


also warning about undefined behavior.
 
L

Luca Risolia

compiler warns about undefined behavior here.

Ouch..split that:

++f;
f %= cap_;

++b;
b %= cap_

I did not turn on all the warnings. My compiler produced the correct
code though.
 
M

Melzzzzz

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much worse
than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.
 
L

Luca Risolia

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much worse
than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.

Yes, it's worth to strace std::rand() to see what it does exactly.
Out of my curiosity, try to yield() after releasing the lock in both
put() and take():

m_.unlock();
boost::this_thread::yield(); // add this line

do you see any improvements?
 
M

Melzzzzz

On 26/07/2012 21:56, Melzzzzz wrote:
f = ++f % cap_;

compiler warns about undefined behavior here.

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much worse
than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.

Yes, it's worth to strace std::rand() to see what it does exactly.
Out of my curiosity, try to yield() after releasing the lock in both
put() and take():

m_.unlock();
boost::this_thread::yield(); // add this line

do you see any improvements?

No, that make it drastically slower (when not assigning array and using
rand, also not any faster when rand is used).
You could speed up your program by 10-30% I think with just
using my original implementation of
++f;
if(f == cap_)f = 0;
instead of
++f;
f %= cap_;
because division is drastically slower on modern CPUs,
than check and assignment (that's why division by constant
is usually implemented by shift and multiply).
In this case I don;t think compiler can be that clever, it
has to use division and that makes performance *much* worse.

take a look at my time now (with if instead of modulo)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m5.234s
user 0m5.912s
sys 0m3.976s

almost 30% improvement!
 
M

Melzzzzz

On 26/07/2012 21:56, Melzzzzz wrote:
f = ++f % cap_;

compiler warns about undefined behavior here.

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much worse
than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.

Yes, it's worth to strace std::rand() to see what it does exactly.
Out of my curiosity, try to yield() after releasing the lock in both
put() and take():

m_.unlock();
boost::this_thread::yield(); // add this line

do you see any improvements?

I have solved your problem. Seems that your program suffers same problem
as my original version. When applied Howards algorithm to your version
I've got fastest time both for rand array assignment and without:

take a look:
(merged version with rand array assignment)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m4.731s
user 0m5.164s
sys 0m2.804s

(merged version without rand array assignment,
wow this is fast ;)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m2.345s
user 0m2.624s
sys 0m0.820s

code:
#include <thread>
#include <cstdlib>
#include <functional>
#include <atomic>
#include <memory>
#include <type_traits>

class spinlock_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock() {
while (flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
};
}
bool try_lock(){
if(flag.test_and_set(std::memory_order_acquire))
return false;
else
return true;
}
void unlock() {
flag.clear(std::memory_order_release);
}
};

template <class T, class A = std::allocator<T >>
class BlockingQueue {
const std::size_t cap_;
A alloc_;
T* queue_;
std::size_t n = 0, b = 0 , f = 0;
spinlock_mutex m_;
public:

BlockingQueue(const std::size_t cap, const A& alloc = A())
: cap_(cap), alloc_(alloc), queue_(alloc_.allocate(cap)) { }

~BlockingQueue() {
alloc_.deallocate(queue_, cap_);
}

static_assert(std::is_nothrow_move_constructible<T>::value,
"BlockingQueue cannot be exception-safe");

void put(T t) {
bool locked = m_.try_lock();
int retry = 0;
while(!locked)
{
if(n > cap_ / 4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
locked = m_.try_lock();
}
}
alloc_.construct(&queue_[f], std::move(t));
++f;
if(f == cap_)f = 0;
++n;
m_.unlock();
}

T take() {
bool locked = m_.try_lock();
int retry = 0;
while(!locked)
{
if(n < 3 * cap_ / 4 && ++retry < 1000)
{
std::this_thread::yield();
}
else
{
locked = m_.try_lock();
}
}
T t = std::move(queue_);
alloc_.destroy(&queue_);
++b;
if(b == cap_) b = 0;
--n;
m_.unlock();
return t;
}
};

int main() {
BlockingQueue<int> produced(1000);
const int nitems = 100000000;
std::srand(12345);


std::function<void() > f_prod = [&](){
int i = nitems;
while (i-- > 0) {
produced.put(i);
}
};

std::thread producer1(f_prod);

std::function<void() > f_cons = [&](){
const int size = 10000;
int arr[size];
int i = nitems;
while (i-- > 0) {
arr[std::rand() % size] =
produced.take();
}
};

std::thread consumer1(f_cons);

producer1.join();
consumer1.join();
}
 
M

Melzzzzz

On Thu, 26 Jul 2012 22:15:33 +0200

On 26/07/2012 21:56, Melzzzzz wrote:
f = ++f % cap_;

compiler warns about undefined behavior here.

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much
worse than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.

Yes, it's worth to strace std::rand() to see what it does exactly.
Out of my curiosity, try to yield() after releasing the lock in
both put() and take():

m_.unlock();
boost::this_thread::yield(); // add this line

do you see any improvements?

I have solved your problem.

Nah, implementation is completely wrong (it was too late),disregard this
post, problem remains... ;(
 
M

Melzzzzz

On 26/07/2012 21:56, Melzzzzz wrote:
f = ++f % cap_;

compiler warns about undefined behavior here.

Ouch..split that:

++f;
f %= cap_;

I corrected that already ;)

I have installed gcc-snapshot compiler
gcc version 4.8.0 20120314 (experimental) [trunk revision 185382]
(Ubuntu/Linaro 20120314-0ubuntu2)

Your timings with array assignment and rand included, are much worse
than gcc 4.6.3. Definitely something is wrong if
other thread does something else besides waiting on
spin lock.

Yes, it's worth to strace std::rand() to see what it does exactly.
Out of my curiosity, try to yield() after releasing the lock in both
put() and take():

m_.unlock();
boost::this_thread::yield(); // add this line

do you see any improvements?

Finally got it ;)
Problem was that producer thread constantly spins, but consumer does
some work. Queue was constantly full and that slowed it down
significantly.
I have used Howards trick, and this time correctly ;)

now times are fastest (queue size was 1000):
(with rand)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m6.215s
user 0m6.440s
sys 0m5.108s

(without rand)
bmaxa@maxa:~/examples$ time ./consprod5

real 0m3.232s
user 0m3.024s
sys 0m2.968s

code:
void put(T t) {
for(;;)
{
m_.lock();
if(n < cap_)
{
int retry = 0;
while(n > 3 * cap_ / 4 && ++retry < 3000)
{
m_.unlock();
std::this_thread::yield();
m_.lock();
}
break;
}
m_.unlock();
}
alloc_.construct(&queue_[f], std::move(t));
++f;
if(f == cap_)f = 0;
++n;
m_.unlock();
}

T take() {
for(;;)
{
m_.lock();
if(n > 0)
{
int retry = 0;
while(n < cap_ / 4 && ++retry < 3000)
{
m_.unlock();
std::this_thread::yield();
m_.lock();
}
break;
}
m_.unlock();
}
T t = std::move(queue_);
alloc_.destroy(&queue_);
++b;
if(b == cap_) b = 0;
--n;
m_.unlock();
return t;
}
 
J

James Kanze

I have played little bit with new C++11 features and compared,
java performance to c++.
Actually this was meant to be GC vs RAII memory management,
but boiled down to speed of BlockingQueue class in java,
and mine in c++.
It seems that java implementation is so much more efficient
but I don't know why. I even tried c++ without dynamic
memory management (except queue itself) and that is *even slower*.
Must be some quirks with a queue ;)

Just a guess (I don't know anything about the Java
BlockingQueue), but your C++ implementation uses an extensible
queue; the restriction on the queue size is external. From the
code, I'd guess that the Java blocking queue is a fixed length
(established once in the constructor). This means that once the
queue has been constructed, there are no more allocations in
Java; in C++, I think you'll find that std::deque does allocate
and free new memory as it expands and contracts.

You might try replacing std::deque with a simple, std::vector
based circular buffer. Construct the std::vector to its fixed
size once at the beginning, and just keep two iterators into it:
one for writing, and one for reading. Wrap the iterators each
time they reach the end. (Since the limit conditions for full
and empty are sometimes difficult to implement, you'd be better
off searching an existing implementation on the net. I'd be
surprised if Boost didn't have one, for example.)
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,766
Messages
2,569,569
Members
45,043
Latest member
CannalabsCBDReview

Latest Threads

Top