Query on simple C++ threadpool implementation
- by ticketman
Stackoverflow has been a tremendous help to me and I'd to give something back to the community. I have been implementing a simple threadpool using the tinythread C++ portable thread library, using what I have learnt from Stackoverflow. I am new to thread programming, so not that comfortable with mutexes, etc.
I have a question best asked after presenting the code (which runs quite well under Linux):
// ThreadPool.h
class ThreadPool
{
public:
ThreadPool();
~ThreadPool();
// Creates a pool of threads and gets them ready to be used
void CreateThreads(int numOfThreads);
// Assigns a job to a thread in the pool, but doesn't start the job
// Each SubmitJob call will use up one thread of the pool.
// This operation can only be undone by calling StartJobs and
// then waiting for the jobs to complete. On completion,
// new jobs may be submitted.
void SubmitJob( void (*workFunc)(void *), void *workData );
// Begins execution of all the jobs in the pool.
void StartJobs();
// Waits until all jobs have completed.
// The wait will block the caller.
// On completion, new jobs may be submitted.
void WaitForJobsToComplete();
private:
enum typeOfWorkEnum { e_work, e_quit };
class ThreadData
{
public:
bool ready; // thread has been created and is ready for work
bool haveWorkToDo;
typeOfWorkEnum typeOfWork;
// Pointer to the work function each thread has to call.
void (*workFunc)(void *);
// Pointer to work data
void *workData;
ThreadData() : ready(false), haveWorkToDo(false) { };
};
struct ThreadArgStruct
{
ThreadPool *threadPoolInstance;
int threadId;
};
// Data for each thread
ThreadData *m_ThreadData;
ThreadPool(ThreadPool const&); // copy ctor hidden
ThreadPool& operator=(ThreadPool const&); // assign op. hidden
// Static function that provides the function pointer that a thread can call
// By including the ThreadPool instance in the void * parameter,
// we can use it to access other data and methods in the ThreadPool instance.
static void ThreadFuncWrapper(void *arg)
{
ThreadArgStruct *threadArg = static_cast<ThreadArgStruct *>(arg);
threadArg->threadPoolInstance->ThreadFunc(threadArg->threadId);
}
// The function each thread calls
void ThreadFunc( int threadId );
// Called by the thread pool destructor
void DestroyThreadPool();
// Total number of threads available
// (fixed on creation of thread pool)
int m_numOfThreads;
int m_NumOfThreadsDoingWork;
int m_NumOfThreadsGivenJobs;
// List of threads
std::vector<tthread::thread *> m_ThreadList;
// Condition variable to signal each thread has been created and executing
tthread::mutex m_ThreadReady_mutex;
tthread::condition_variable m_ThreadReady_condvar;
// Condition variable to signal each thread to start work
tthread::mutex m_WorkToDo_mutex;
tthread::condition_variable m_WorkToDo_condvar;
// Condition variable to signal the main thread that
// all threads in the pool have completed their work
tthread::mutex m_WorkCompleted_mutex;
tthread::condition_variable m_WorkCompleted_condvar;
};
cpp file:
//
// ThreadPool.cpp
//
#include "ThreadPool.h"
// This is the thread function for each thread.
// All threads remain in this function until
// they are asked to quit, which only happens
// when terminating the thread pool.
void ThreadPool::ThreadFunc( int threadId )
{
ThreadData *myThreadData = &m_ThreadData[threadId];
std::cout << "Hello world: Thread " << threadId << std::endl;
// Signal that this thread is ready
m_ThreadReady_mutex.lock();
myThreadData->ready = true;
m_ThreadReady_condvar.notify_one(); // notify the main thread
m_ThreadReady_mutex.unlock();
while(true)
{
//tthread::lock_guard<tthread::mutex> guard(m);
m_WorkToDo_mutex.lock();
while(!myThreadData->haveWorkToDo) // check for work to do
m_WorkToDo_condvar.wait(m_WorkToDo_mutex); // if no work, wait here
myThreadData->haveWorkToDo = false; // need to do this before unlocking the mutex
m_WorkToDo_mutex.unlock();
// Do the work
switch(myThreadData->typeOfWork)
{
case e_work:
std::cout << "Thread " << threadId << ": Woken with work to do\n";
// Do work
myThreadData->workFunc(myThreadData->workData);
std::cout << "#Thread " << threadId << ": Work is completed\n";
break;
case e_quit:
std::cout << "Thread " << threadId << ": Asked to quit\n";
return; // ends the thread
}
// Now to signal the main thread that my work is completed
m_WorkCompleted_mutex.lock();
m_NumOfThreadsDoingWork--;
// Unsure if this 'if' would make the program more efficient
// if(NumOfThreadsDoingWork == 0)
m_WorkCompleted_condvar.notify_one(); // notify the main thread
m_WorkCompleted_mutex.unlock();
}
}
ThreadPool::ThreadPool()
{
m_numOfThreads = 0; m_NumOfThreadsDoingWork = 0; m_NumOfThreadsGivenJobs = 0;
}
ThreadPool::~ThreadPool()
{
if(m_numOfThreads)
{
DestroyThreadPool();
delete [] m_ThreadData;
}
}
void ThreadPool::CreateThreads(int numOfThreads)
{
// Check a thread pool has already been created
if(m_numOfThreads > 0)
return;
m_NumOfThreadsGivenJobs = 0;
m_NumOfThreadsDoingWork = 0;
m_numOfThreads = numOfThreads;
m_ThreadData = new ThreadData[m_numOfThreads];
ThreadArgStruct threadArg;
for(int i=0; i<m_numOfThreads; ++i)
{
threadArg.threadId = i;
threadArg.threadPoolInstance = this;
// Creates the thread and save in a list so we can destroy it later
m_ThreadList.push_back( new tthread::thread( ThreadFuncWrapper, (void *)&threadArg ) );
// It takes a little time for a thread to get established.
// Best wait until it gets established before creating the next thread.
m_ThreadReady_mutex.lock();
while(!m_ThreadData[i].ready) // Check if thread is ready
m_ThreadReady_condvar.wait(m_ThreadReady_mutex); // If not, wait here
m_ThreadReady_mutex.unlock();
}
}
// Adds a job to the batch, but doesn't start the job
void ThreadPool::SubmitJob(void (*workFunc)(void *), void *workData)
{
// Check that the thread pool has been created
if(!m_numOfThreads)
return;
if(m_NumOfThreadsGivenJobs >= m_numOfThreads)
return;
m_ThreadData[m_NumOfThreadsGivenJobs].workFunc = workFunc;
m_ThreadData[m_NumOfThreadsGivenJobs].workData = workData;
std::cout << "Submitted job " << m_NumOfThreadsGivenJobs << std::endl;
m_NumOfThreadsGivenJobs++;
}
void ThreadPool::StartJobs()
{
// Check that the thread pool has been created
// and some jobs have been assigned
if(!m_numOfThreads || !m_NumOfThreadsGivenJobs)
return;
// Set 'haveworkToDo' flag for all threads
m_WorkToDo_mutex.lock();
for(int i=0; i<m_NumOfThreadsGivenJobs; ++i)
m_ThreadData[i].haveWorkToDo = true;
m_NumOfThreadsDoingWork = m_NumOfThreadsGivenJobs;
// Reset this counter so we can resubmit jobs later
m_NumOfThreadsGivenJobs = 0;
// Notify all threads they have work to do
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();
}
void ThreadPool::WaitForJobsToComplete()
{
// Check that a thread pool has been created
if(!m_numOfThreads)
return;
m_WorkCompleted_mutex.lock();
while(m_NumOfThreadsDoingWork > 0) // Check if all threads have completed their work
m_WorkCompleted_condvar.wait(m_WorkCompleted_mutex); // If not, wait here
m_WorkCompleted_mutex.unlock();
}
void ThreadPool::DestroyThreadPool()
{
std::cout << "Ask threads to quit\n";
m_WorkToDo_mutex.lock();
for(int i=0; i<m_numOfThreads; ++i)
{
m_ThreadData[i].haveWorkToDo = true;
m_ThreadData[i].typeOfWork = e_quit;
}
m_WorkToDo_condvar.notify_all();
m_WorkToDo_mutex.unlock();
// As each thread terminates, catch them here
for(int i=0; i<m_numOfThreads; ++i)
{
tthread::thread *t = m_ThreadList[i];
// Wait for thread to complete
t->join();
}
m_numOfThreads = 0;
}
Example of usage:
(this calculates pi-squared/6)
struct CalculationDataStruct
{
int inputVal;
double outputVal;
};
void LongCalculation( void *theSums )
{
CalculationDataStruct *sums = (CalculationDataStruct *)theSums;
int terms = sums->inputVal;
double sum;
for(int i=1; i<terms; i++)
sum += 1.0/( double(i)*double(i) );
sums->outputVal = sum;
}
int main(int argc, char** argv)
{
int numThreads = 10;
// Create pool
ThreadPool threadPool;
threadPool.CreateThreads(numThreads);
// Create thread workspace
CalculationDataStruct sums[numThreads];
// Set up jobs
for(int i=0; i<numThreads; i++)
{
sums[i].inputVal = 3000*(i+1);
threadPool.SubmitJob(LongCalculation, &sums[i]);
}
// Run the jobs
threadPool.StartJobs();
threadPool.WaitForJobsToComplete();
// Print results
for(int i=0; i<numThreads; i++)
std::cout << "Sum of " << sums[i].inputVal << " terms is " << sums[i].outputVal << std::endl;
return 0;
}
Question:
In the ThreadPool::ThreadFunc method, would better performance be obtained if the following if statement
if(NumOfThreadsDoingWork == 0)
was included?
Also, I'd be grateful of criticisms and ways to improve the code. At the same time, I hope the code is of use to others.