Jim Langston said:
I once had a similar situation where I needed to pass information to a
thread. I wound up wrapping a std::queue<std::string> in a class with a
named locked on pushes and pops. Unfortunately, I have since lost that
code or I would post it here. It's not that difficult though.
Oh you're luckly. I looked a little deeper and found the code I was working
on to test this, never put into production, not fully tested. The following
code is not guaranteed to work correctly or be exactly what you want, and it
may be windows specific which you may need to fix.
#include <string>
#include <queue>
#include <iostream>
#include <vector>
#include <sstream>
#include <conio.h>
#include <windows.h>
#include <process.h>
template<typename T, typename F > T StrmConvert( F from )
{
std::stringstream temp;
temp << from;
T to = T();
temp >> to;
return to;
}
class ThreadQueue
{
public:
unsigned int ThreadID;
ThreadQueue(unsigned int limit): Shutdown( false ), ThreadID( 0 )
{
Limit = limit; // Only saved for copy and assignment constructors
which aren't used cause can't figure out good way
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security
attributes
0, // initial count
limit, // max count
NULL); // anonymous
::InitializeCriticalSection(&lock);
}
~ThreadQueue()
{
::EnterCriticalSection(&lock);
Shutdown = true;
while ( ! MsgQueue.empty() )
{
delete MsgQueue.front();
MsgQueue.pop();
}
::LeaveCriticalSection(&lock);
::CloseHandle(handles[SemaphoreIndex]);
:
eleteCriticalSection(&lock);
}
bool AddTail(std::string* pmessage)
{
if ( Shutdown )
return false;
bool result;
::EnterCriticalSection(&lock);
MsgQueue.push(pmessage);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL) != 0;
if (!result)
{
// caller can use ::GetLastError to determine what went wrong
MsgQueue.pop();
}
::LeaveCriticalSection(&lock);
return result;
}
std::string* RemoveHead( const unsigned long Milliseconds = 0 ) //
Milliseconds = INFINITE for blocking
{
if ( Shutdown )
return NULL;
std::string* result;
switch
:WaitForMultipleObjects(1, handles, FALSE, Milliseconds))
{
case SemaphoreIndex: // semaphore
::EnterCriticalSection(&lock);
result = MsgQueue.front();
MsgQueue.pop();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT:
return NULL;
default:
throw "Unknown WaitForMultipleObjects value";
}
}
// Terminate sets shutdown and flushes queue
void Terminate()
{
::EnterCriticalSection(&lock);
Shutdown = true;
while ( ! MsgQueue.empty() )
{
delete MsgQueue.front();
MsgQueue.pop();
}
::LeaveCriticalSection(&lock);
}
void ShutItDown()
{
::EnterCriticalSection(&lock);
Shutdown = true;
::LeaveCriticalSection(&lock);
}
bool isShutDown()
{
::EnterCriticalSection(&lock);
bool ReturnStatus = Shutdown;
::LeaveCriticalSection(&lock);
return ReturnStatus;
}
protected:
enum {SemaphoreIndex};
HANDLE handles[1];
CRITICAL_SECTION lock;
std::queue< std::string* > MsgQueue;
unsigned int Limit; // Need for copy and assignment constructors
volatile bool Shutdown;
private:
// Copy Constructor - Doesn't work correctly
ThreadQueue(const ThreadQueue& Queue): Shutdown( false ), ThreadID( 0 )
{
Limit = Queue.Limit; // Only saved for copy and assignment
constructors
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security
attributes
0, // initial count
Limit, // max count
NULL); // anonymous
::InitializeCriticalSection(&lock);
}
// Assignment Constructor - Doesn't work correctly
ThreadQueue& operator=(const ThreadQueue& Queue)
{
if (this == &Queue)
return *this;
ThreadID = 0;
Shutdown = false;
Limit = Queue.Limit; // Only saved for copy and assignment
constructors
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security
attributes
0, // initial count
Limit, // max count
NULL); // anonymous
::InitializeCriticalSection(&lock);
return *this;
}
};
unsigned __stdcall Connection(void* a)
{
ThreadQueue* Interface = reinterpret_cast< ThreadQueue* >( a );
std::string* Message;
for ( int i = 0; i < 100; ++i )
{
if ( Interface->isShutDown() )
return 0;
Message = new std::string;
*Message = *Message + StrmConvert<std::string>( i );
if ( ! Interface->AddTail( Message ) )
{
// Should do some error here, but what?
}
}
return 0;
}
struct ThreadInfo
{
unsigned int ThreadID;
HANDLE Handle;
};
int ThreadQueuemain()
{
const static int NumOfQueues = 5000;
std::vector< ThreadInfo > Threads;
std::vector< ThreadQueue* > Interfaces;
std::cout << "Initializing Threads..." << std::endl;
for ( int i = 0; i < NumOfQueues; ++i )
{
std::vector< ThreadQueue* >::iterator TQit = Interfaces.insert(
Interfaces.end(), new ThreadQueue( 32767 ) );
std::vector< ThreadInfo >::iterator TIit = Threads.insert(
Threads.end(), ThreadInfo() );
(*TIit).Handle = reinterpret_cast<HANDLE>(_beginthreadex(0, 0,
Connection, (void*) *TQit, 0, &(*TIit).ThreadID));
}
std::string* Message = NULL;
while ( ! _kbhit() )
{
for ( std::vector<ThreadQueue*>::iterator it = Interfaces.begin();
it != Interfaces.end(); ++it )
{
Message = (*it)->RemoveHead();
if ( Message != NULL )
{
std::cout << *Message << " ";
delete Message;
}
}
}
for ( std::vector<ThreadQueue*>::iterator it = Interfaces.begin(); it !=
Interfaces.end(); ++it )
(*it)->Terminate();
// Wait for all threads to terminate before ending
for ( std::vector< ThreadInfo >::iterator it = Threads.begin(); it !=
Threads.end(); ++it )
{
WaitForSingleObject( (*it).Handle, INFINITE );
CloseHandle( (*it).Handle );
}
for ( std::vector<ThreadQueue*>::iterator it = Interfaces.begin(); it !=
Interfaces.end(); ++it )
delete (*it);
std::cout << "\nPress Return..." << std::flush;
std::string wait;
std::getline( std::cin, wait );
return 0;
}