I wasn’t expecting multithreading to be such a big part of the FYP, but it’s turned out to be something I find really exciting and I’ve been spending a lot of time on it. It’s looking like a multithreading library might fall out of this! Here’s there headers for the code. So far I have the following classes:
YkdLockManager
As I discussed before, this class looks after locking various objects. Since the post I wrote on it a few days ago, I’ve implemented shared locks, so objects don’t just have lock(), but instead have lockForRead() and lockForWrite(). This means that multiple objects can get a lock at the same time if they’re only reading, but when one requests write access it can get it exclusively.
YkdLockManager uses a shared lock internally too. When a mutex is requested, it requests read-only access on the queue and searches for it. If it finds it, then it unlocks and returns. If it doesn’t find it, it “upgrades” its lock to exclusive access for write (this might take a while if other threads have ownership). Once it attains write access, it creates a new mutex and adds it to the queue, then unlocks and returns.
boost::shared_mutex* YkdLockManager::getMutex(void* object) { // pointer to the mutex we'll return eventually. // I use this as "it->second" should be done while this thread // has read access, making "return it->second" impossible. boost::shared_mutex *mutex; // Request read-only access mLockMapLock.lock_shared(); // Search the map for the object std::map<void*, boost::shared_mutex*>::iterator it = mLockMap.find( object ); // If not found.. if ( it == mLockMap.end() ) { // upgrade the lock to an exclusive lock for write access mLockMapLock.lock_upgrade(); mutex = new boost::shared_mutex(); // perform write operation mLockMap[object] = mutex; // surrender ownership mLockMapLock.unlock_upgrade(); return mutex; } mutex = it->second; // surrender ownership mLockMapLock.unlock_shared(); return mutex; }
YkdJobQueue
The job queue is basically an std::deque (double-ended queue) of boost function pointers. This means that you can add any method call at all to the queue, so anything can become a job. If you’re calling a method but don’t need results from it right away, you can just put it into the job queue and a worker thread will pick it up. Just to be clear, boost::bind will wrap anything into a function pointer, so parameter passing and everything is ok (though it’s worth giving a moment’s thought to the fact that referenced objects might disappear before the thread gets started if you’re not careful).
typedef boost::function0<void> YkdJobPtr; // ... later ... std::deque< YkdJobPtr > mQueue;
YkdJobQueue::getJob() is blocking. When it’s called, it will use boost::condition::wait() to block until a job is available. The call is unblocked when YkdJobQueue::addJob() adds a job to the queue and calls boost::condition::notify_one(), which wakes up one of the threads waiting for a job to be available.
void YkdJobQueue::addJob( YkdJobPtr job ) { // Take ownership of mQueue boost::mutex::scoped_lock lock( mQueueMutex ); // Add new job to the queue mQueue.push_back( job ); // Notify some other thread that data is available mNotEmpty.notify_one(); } YkdJobPtr YkdJobQueue::getJob() { // Take ownership of mQueue boost::mutex::scoped_lock lock( mQueueMutex ); // wait for something to arrive while ( mQueue.empty() ) { mNotEmpty.wait( lock ); } // Return the front and pop it off YkdJobPtr job = mQueue.front(); mQueue.pop_front(); return job; }
YkdThreadPool
The thread pool just spawns worker threads and delete them when its destructor is called. Worker threads’ destructors interrupt and join them.
YkdWorkerThread
The worker threads don’t do much beyond grabbing a job, running it, and grabbing another job until it’s interrupted. Not much going on, but I like how interruption is disabled while the job is actually running and enabled while waiting for a job to be available.
void YkdWorkerThread::YkdWorkerThreadMain() { for ( ;; ) { // Grab a new job. This is a blocking call. YkdJobPtr job = YkdJobQueue::getSingletonPtr()->getJob(); // Don't allow interruption until the job is done. boost::this_thread::disable_interruption di; // Run the job job(); } }
