Mutex in ostream operator - doesn't work

H

Hansel Stroem

Good evening dear newsgroup (and sorry for cross-posting with .threads),

I am trying to synchronize output to ostreams such as cout and cerr for
threaded logging purposes. And it doesn't seem to work. The blurb of code
below is supposed to do following :

ostream& operator <<(ostream& out, StreamLocker& SL)
{

if (not_locked)
{
pthread_mutex_lock(this_lock);
not_lock = false;
}
else // if locked
{
pthread_mutex_unlock(this_lock);
not_lock = true;
}

return out;
}

Rest of code is debugging messages and attempts to get it to work with
flush() et al. Nothing works; output is as mingled as it could be. Please
help me kindly.

Thank you,
Not A Coder.
-----------------------------------------------
#include <iostream>
#include <pthread.h>
#include <unistd.h>


using namespace std;

static pthread_mutex_t stderr_lock;
static pthread_mutex_t stdout_lock;

class StreamLocker
{
public:
bool locked;
pthread_mutex_t my_lock;
StreamLocker();
void InitLock(pthread_mutex_t mutex_lock);
friend ostream& operator<<(ostream& out, StreamLocker &sl);
};

static StreamLocker SL;

StreamLocker::StreamLocker()
{
cout << "Creating StreamLocker" << endl;
this->locked = false;
pthread_mutex_init(&my_lock, NULL);
pthread_mutex_init(&stderr_lock, NULL);
pthread_mutex_init(&stdout_lock, NULL);
cout << "Created StreamLocker" << endl;
}

ostream& operator <<(ostream& out, StreamLocker &sl)
{
if (!sl.locked)
{
if (out == cerr)
{
out << "LK ER";
pthread_mutex_lock(&stderr_lock);
}
else if (out == cout)
{
out.flush();
out << "*LK OU*";
if ( pthread_mutex_lock(&stdout_lock) != 0 )
perror("Couldn't obtain lock");
}
else
{
out << "LK SE";
pthread_mutex_lock( &(sl.my_lock) );
}
}
else
{
out.flush();
if (out == cerr)
{
out << "ULK ER";
pthread_mutex_unlock(&stderr_lock);
}
else if (out == cout)
{
out << "*ULK OU*";
if ( pthread_mutex_unlock(&stdout_lock) != 0 )
perror("Couldn't unlock");
}
else
{
out << "ULK SE";
pthread_mutex_unlock(& (sl.my_lock)) ;
}
}
sl.locked = !sl.locked;
// out.flush();
return out;
}

void StreamLocker::InitLock(pthread_mutex_t mutex_lock)
{
pthread_mutex_init(&mutex_lock, NULL);
}


typedef void* (*thread_body) (void*) ;

template<int N>
void* Thread_Body(void* arg)
{
unsigned long cycle = 1;
while ( ++cycle > 0)
{
cout << SL ;
cout << "Thread ## " << N << " in cycle ## " << cycle << " speaking with
id " << pthread_self() << endl ;
cout << SL;
if ( (random() % 111)==0 )
sleep( 1 );
}
}



int main(int argc, char** argv)
{
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_t thread_id;

thread_body tb_1 = Thread_Body<1>, tb_2 = Thread_Body<2> , tb_3 =
Thread_Body<3>;

pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_3, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, Thread_Body<7>, NULL);

pthread_join(thread_id, NULL);
}
 
P

Pavel

Hansel said:
Good evening dear newsgroup (and sorry for cross-posting with .threads),

I am trying to synchronize output to ostreams such as cout and cerr for
threaded logging purposes. And it doesn't seem to work. The blurb of code
below is supposed to do following :

ostream& operator <<(ostream& out, StreamLocker& SL)
{

if (not_locked)
{
pthread_mutex_lock(this_lock);
not_lock = false;
}
else // if locked
{
pthread_mutex_unlock(this_lock);
not_lock = true;
}

return out;
}

Rest of code is debugging messages and attempts to get it to work with
flush() et al. Nothing works; output is as mingled as it could be. Please
help me kindly.

Thank you,
Not A Coder.
-----------------------------------------------
#include <iostream>
#include <pthread.h>
#include <unistd.h>


using namespace std;

static pthread_mutex_t stderr_lock;
static pthread_mutex_t stdout_lock;

class StreamLocker
{
public:
bool locked;
pthread_mutex_t my_lock;
StreamLocker();
void InitLock(pthread_mutex_t mutex_lock);
friend ostream& operator<<(ostream& out, StreamLocker &sl);
};

static StreamLocker SL;

StreamLocker::StreamLocker()
{
cout << "Creating StreamLocker" << endl;
this->locked = false;
pthread_mutex_init(&my_lock, NULL);
pthread_mutex_init(&stderr_lock, NULL);
pthread_mutex_init(&stdout_lock, NULL);
cout << "Created StreamLocker" << endl;
}

ostream& operator <<(ostream& out, StreamLocker &sl)
{
if (!sl.locked)
{
if (out == cerr)
{
out << "LK ER";
pthread_mutex_lock(&stderr_lock);
}
else if (out == cout)
{
out.flush();
out << "*LK OU*";
if ( pthread_mutex_lock(&stdout_lock) != 0 )
perror("Couldn't obtain lock");
}
else
{
out << "LK SE";
pthread_mutex_lock( &(sl.my_lock) );
}
}
else
{
out.flush();
if (out == cerr)
{
out << "ULK ER";
pthread_mutex_unlock(&stderr_lock);
}
else if (out == cout)
{
out << "*ULK OU*";
if ( pthread_mutex_unlock(&stdout_lock) != 0 )
perror("Couldn't unlock");
}
else
{
out << "ULK SE";
pthread_mutex_unlock(& (sl.my_lock)) ;
}
}
sl.locked = !sl.locked;
// out.flush();
return out;
}

void StreamLocker::InitLock(pthread_mutex_t mutex_lock)
{
pthread_mutex_init(&mutex_lock, NULL);
}


typedef void* (*thread_body) (void*) ;

template<int N>
void* Thread_Body(void* arg)
{
unsigned long cycle = 1;
while ( ++cycle > 0)
{
cout << SL ;
cout << "Thread ## " << N << " in cycle ## " << cycle << " speaking with
id " << pthread_self() << endl ;
cout << SL;
if ( (random() % 111)==0 )
sleep( 1 );
}
}



int main(int argc, char** argv)
{
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_t thread_id;

thread_body tb_1 = Thread_Body<1>, tb_2 = Thread_Body<2> , tb_3 =
Thread_Body<3>;

pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_3, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, Thread_Body<7>, NULL);

pthread_join(thread_id, NULL);
}
Not sure it is C++ question.. anyway, one thing that struck me
immediately (never had time to look through the whole code) is that the
code checks and sets the shared sl.locked outside of the mutex. Which
means, the threads will often be fooled to assume the mutex is not
locked when it is and other way around.. not that I advise to
sycnrhonize `locked' or even use it at all; I just believe it is one of
the problems in the code.

My advice would be:

Use thread-local or just local ostrstream (yes, the deprecated one, from
Appendix D, the new ostringstream is even slower than ostrstream) then
flush the log record onto the device in a single system call or under
mutex if you want to make sure it is written in full and you have to use
a loop of system calls for that on your system.

-Pavel
 
C

Christopher Pisz

Hansel Stroem said:
Good evening dear newsgroup (and sorry for cross-posting with .threads),

I am trying to synchronize output to ostreams such as cout and cerr for
threaded logging purposes. And it doesn't seem to work. The blurb of code
below is supposed to do following :

ostream& operator <<(ostream& out, StreamLocker& SL)
{

if (not_locked)
{
pthread_mutex_lock(this_lock);
not_lock = false;
}
else // if locked
{
pthread_mutex_unlock(this_lock);
not_lock = true;
}

return out;
}

Rest of code is debugging messages and attempts to get it to work with
flush() et al. Nothing works; output is as mingled as it could be. Please
help me kindly.

Thank you,
Not A Coder.
-----------------------------------------------
#include <iostream>
#include <pthread.h>
#include <unistd.h>


using namespace std;

static pthread_mutex_t stderr_lock;
static pthread_mutex_t stdout_lock;

class StreamLocker
{
public:
bool locked;
pthread_mutex_t my_lock;
StreamLocker();
void InitLock(pthread_mutex_t mutex_lock);
friend ostream& operator<<(ostream& out, StreamLocker &sl);
};

static StreamLocker SL;

StreamLocker::StreamLocker()
{
cout << "Creating StreamLocker" << endl;
this->locked = false;
pthread_mutex_init(&my_lock, NULL);
pthread_mutex_init(&stderr_lock, NULL);
pthread_mutex_init(&stdout_lock, NULL);
cout << "Created StreamLocker" << endl;
}

ostream& operator <<(ostream& out, StreamLocker &sl)
{
if (!sl.locked)
{
if (out == cerr)
{
out << "LK ER";
pthread_mutex_lock(&stderr_lock);
}
else if (out == cout)
{
out.flush();
out << "*LK OU*";
if ( pthread_mutex_lock(&stdout_lock) != 0 )
perror("Couldn't obtain lock");
}
else
{
out << "LK SE";
pthread_mutex_lock( &(sl.my_lock) );
}
}
else
{
out.flush();
if (out == cerr)
{
out << "ULK ER";
pthread_mutex_unlock(&stderr_lock);
}
else if (out == cout)
{
out << "*ULK OU*";
if ( pthread_mutex_unlock(&stdout_lock) != 0 )
perror("Couldn't unlock");
}
else
{
out << "ULK SE";
pthread_mutex_unlock(& (sl.my_lock)) ;
}
}
sl.locked = !sl.locked;
// out.flush();
return out;
}

void StreamLocker::InitLock(pthread_mutex_t mutex_lock)
{
pthread_mutex_init(&mutex_lock, NULL);
}


typedef void* (*thread_body) (void*) ;

template<int N>
void* Thread_Body(void* arg)
{
unsigned long cycle = 1;
while ( ++cycle > 0)
{
cout << SL ;
cout << "Thread ## " << N << " in cycle ## " << cycle << " speaking with
id " << pthread_self() << endl ;
cout << SL;
if ( (random() % 111)==0 )
sleep( 1 );
}
}



int main(int argc, char** argv)
{
pthread_attr_t thread_attr;
pthread_attr_init(&thread_attr);
pthread_t thread_id;

thread_body tb_1 = Thread_Body<1>, tb_2 = Thread_Body<2> , tb_3 =
Thread_Body<3>;

pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, tb_1, NULL);
pthread_create(&thread_id, &thread_attr, tb_3, NULL);
pthread_create(&thread_id, &thread_attr, tb_2, NULL);
pthread_create(&thread_id, &thread_attr, Thread_Body<7>, NULL);

pthread_join(thread_id, NULL);
}

What a coincidence that I am studying a very simular problem in a book as we
speak. The authors of "Standard C++ IoStreams and Locals" suggests that you
add the lock and unlock to the sentry object for the stream and use that
sentry object in your insertion and extraction functions. This was in the
context of making custom instertors and extractors for user defined types,
so I am not sure if it is applicable to your desire to implement the
behavior on every type. Unfortuantely, I have not gotten to the part where I
actually add the operation to the sentry object, but its worth mentioning so
you can also research in the meanwhile.

It was also said that, if you call a stream operator << or >> within a
function that defines a stream operator << or >> you invoke the prefix and
postfix operations (such as flushing, skipping of whitespace, flushing of
tied stream, and custom things like the suggested locking and unlocking) the
sentry governs anyway , and as a result create alot of unecesaary overhead,
as well as imho possible problems in your multithreaded context.
 
I

Ian Collins

Hansel said:
Good evening dear newsgroup (and sorry for cross-posting with .threads),
You haven't cross-posted, you have multi-posted, which is never a good
idea. c.p.t is the better group for this question.
 
J

James Kanze


[...]
What a coincidence that I am studying a very simular problem
in a book as we speak. The authors of "Standard C++ IoStreams
and Locals" suggests that you add the lock and unlock to the
sentry object for the stream and use that sentry object in
your insertion and extraction functions.

That doesn't solve the problem for things like:

logstream << "whatever = " << whatever << std::endl ;

Another thread can still intervene between the different <<
operators.

I use a wrapper for this, with a template operator<< which
forwards to the actual stream. The constructor of the wrapper
acquires the lock, and the destructor frees it. (Since the
wrapper will usually be used as a temporary, the lock will be
freed at the end of the full expression.)

Something like (off the top of my head):

class LockedStream
{
public:
LockedStream( ostream& dest, Mutex& mutex )
: myStream( dest )
, myMutex( mutex )
, myCounter( new int( 1 ) )
{
myMutex->lock() ;
}
LockedStream( LockedStream const& other )
: myStream( other.myStream )
, myMutex( other.myMutex )
, myCounter( other.myCounter )
{
++ (*myCounter) ;
}
~LockedStream()
{
-- (*myCounter) ;
if ( *myCounter == 0 ) {
myMutex->unlock() ;
delete myCounter ;
}
}

template< typename T >
LockedStream& operator<<( T const& obj )
{
myStream << obj ;
return *this ;
}

LockedStream& operator<<( std::ios& (*manip)( std::ios ))
{
myStream << manip ;
}

LockedStream& operator<<( std::eek:stream& (*manip)
( std::eek:stream ) )
{
myStream << manip ;
}

LockedStream& operator<<( char const* s )
{
myStream << s ;
}

private:
ostream& myStream ;
Mutex& myMutex ;
int* myCounter ;
} ;

You can then write something like:

LockedStream( myStream ) << "whatever = " << whatever <<
std::endl ;

and hold the lock for the entire duration.
 
J

James Kanze

You haven't cross-posted, you have multi-posted, which is
never a good idea. c.p.t is the better group for this
question.

Cross-posting would have been better, but I really don't think
it very relevant to c.p.t---they are interested in general
threading issues there, and not C++ specific code. His problem
involves the particularities of iostream, and probably requires
templates to solve. I don't think c.p.t will help him much
there, where as it's right on subject here.
 
I

Ian Collins

James said:
Cross-posting would have been better, but I really don't think
it very relevant to c.p.t---they are interested in general
threading issues there, and not C++ specific code. His problem
involves the particularities of iostream, and probably requires
templates to solve. I don't think c.p.t will help him much
there, where as it's right on subject here.
On the contrary, the example at the top of the post (the mish-mash of
tests and locks) is most relevant to c.p.t, which is why I answered it
there!
 
C

Chris Thomasson

[...]
What a coincidence that I am studying a very simular problem in a book
as we speak. The authors of "Standard C++ IoStreams and Locals"
suggests that you add the lock and unlock to the sentry object for the
stream and use that sentry object in your insertion and extraction
functions.

That doesn't solve the problem for thingshttp://groups.google.com/group/comp.programming.threads/browse_frm/thread/6b37f881199c5133 like: [...]
Another thread can still intervene between the different << operators.

I use a wrapper for this, with a template operator<< which forwards to
the actual stream. [...]
You can then write something like:

LockedStream( myStream ) << "whatever = " << whatever <<
std::endl ;

and hold the lock for the entire duration.

This is a much more scalable approach:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/6b37f881199c5133
 
A

Alf P. Steinbach

* Chris Thomasson:
news:[email protected]... [...]
What a coincidence that I am studying a very simular problem in a book
as we speak. The authors of "Standard C++ IoStreams and Locals"
suggests that you add the lock and unlock to the sentry object for the
stream and use that sentry object in your insertion and extraction
functions.
That doesn't solve the problem for thingshttp://groups.google.com/group/comp.programming.threads/browse_frm/thread/6b37f881199c5133 like: [...]
Another thread can still intervene between the different << operators.

I use a wrapper for this, with a template operator<< which forwards to
the actual stream. [...]
You can then write something like:

LockedStream( myStream ) << "whatever = " << whatever <<
std::endl ;

and hold the lock for the entire duration.

This is a much more scalable approach:

http://groups.google.com/group/comp.programming.threads/browse_frm/thread/6b37f881199c5133

AFAICS the difference is that this uses flip-flop like lock object that
when it's output via << locks or unlocks depending on current state, and
that it treats some particular streams specially (not scalable).

Much better to have that state explicit as with James' solution.

In particular for exception safety (guaranteed unlock via destructor,
not a missed unlocking <<-operation), but also just in general to have
the intended effect explicit in the code instead of implicit in some
magic that easily gets confused about its state.


Cheers,

- Alf
 
C

Chris Thomasson

* Chris Thomasson:
[...]
What a coincidence that I am studying a very simular problem in a
book as we speak. The authors of "Standard C++ IoStreams and Locals"
suggests that you add the lock and unlock to the sentry object for
the stream and use that sentry object in your insertion and
extraction functions.
That doesn't solve the problem for
thingshttp://groups.google.com/group/comp.programming.threads/ browse_frm/thread/6b37f881199c5133
like: [...]
Another thread can still intervene between the different << operators.

I use a wrapper for this, with a template operator<< which forwards to
the actual stream. [...]
You can then write something like:

LockedStream( myStream ) << "whatever = " << whatever <<
std::endl ;

and hold the lock for the entire duration.

This is a much more scalable approach:

http://groups.google.com/group/comp.programming.threads/browse_frm/
thread/6b37f881199c5133

AFAICS the difference is that this uses flip-flop like lock object that
when it's output via << locks or unlocks depending on current state, and
that it treats some particular streams specially (not scalable).
[...]

I described a distributed logging scheme which is scalable indeed. In
other words, N threads can create multiple concurrent log-entries with any
contention what so ever. The dedicated logging thread coalesces/sorts all
the entries into their respective outputs. I can show pseudo-code if you
want; perhaps I am misunderstanding you...
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,755
Messages
2,569,536
Members
45,013
Latest member
KatriceSwa

Latest Threads

Top