Feb
10
2009

FYP Technology: YkdMultithreading

I wasn’t expecting multithreading to be such a big part of the FYP, but it’s turned out to be something I find really exciting and I’ve been spending a lot of time on it. It’s looking like a multithreading library might fall out of this! Here’s there headers for the code. So far I have the following classes:

YkdLockManager

As I discussed before, this class looks after locking various objects. Since the post I wrote on it a few days ago, I’ve implemented shared locks, so objects don’t just have lock(), but instead have lockForRead() and lockForWrite(). This means that multiple objects can get a lock at the same time if they’re only reading, but when one requests write access it can get it exclusively.

YkdLockManager uses a shared lock internally too. When a mutex is requested, it requests read-only access on the queue and searches for it. If it finds it, then it unlocks and returns. If it doesn’t find it, it “upgrades” its lock to exclusive access for write (this might take a while if other threads have ownership). Once it attains write access, it creates a new mutex and adds it to the queue, then unlocks and returns.

boost::shared_mutex* YkdLockManager::getMutex(void* object)
{
	// pointer to the mutex we'll return eventually.
	// I use this as "it->second" should be done while this thread 
	//     has read access, making "return it->second" impossible.
	boost::shared_mutex *mutex;
 
	// Request read-only access
	mLockMapLock.lock_shared();
	// Search the map for the object
	std::map<void*, boost::shared_mutex*>::iterator it = mLockMap.find( object );
 
	// If not found..
	if ( it == mLockMap.end() )
	{
		// upgrade the lock to an exclusive lock for write access
		mLockMapLock.lock_upgrade();
		mutex = new boost::shared_mutex();
		// perform write operation
		mLockMap[object] = mutex;
		// surrender ownership
		mLockMapLock.unlock_upgrade();
 
		return mutex;
	}
 
	mutex = it->second;
 
	// surrender ownership
	mLockMapLock.unlock_shared();
	return mutex;
}

YkdJobQueue

The job queue is basically an std::deque (double-ended queue) of boost function pointers. This means that you can add any method call at all to the queue, so anything can become a job. If you’re calling a method but don’t need results from it right away, you can just put it into the job queue and a worker thread will pick it up. Just to be clear, boost::bind will wrap anything into a function pointer, so parameter passing and everything is ok (though it’s worth giving a moment’s thought to the fact that referenced objects might disappear before the thread gets started if you’re not careful).

typedef boost::function0<void> YkdJobPtr;
// ... later ...
std::deque< YkdJobPtr > mQueue;

YkdJobQueue::getJob() is blocking. When it’s called, it will use boost::condition::wait() to block until a job is available. The call is unblocked when YkdJobQueue::addJob() adds a job to the queue and calls boost::condition::notify_one(), which wakes up one of the threads waiting for a job to be available.

void YkdJobQueue::addJob( YkdJobPtr job )
{
	// Take ownership of mQueue
	boost::mutex::scoped_lock lock( mQueueMutex );
 
	// Add new job to the queue
	mQueue.push_back( job );
 
	// Notify some other thread that data is available
	mNotEmpty.notify_one();
}
 
YkdJobPtr YkdJobQueue::getJob()
{
	// Take ownership of mQueue
	boost::mutex::scoped_lock lock( mQueueMutex );
 
	// wait for something to arrive
	while ( mQueue.empty() )
	{
		mNotEmpty.wait( lock );
	}
 
	// Return the front and pop it off
	YkdJobPtr job = mQueue.front();
	mQueue.pop_front();
 
	return job;
}

YkdThreadPool

The thread pool just spawns worker threads and delete them when its destructor is called. Worker threads’ destructors interrupt and join them.

YkdWorkerThread

The worker threads don’t do much beyond grabbing a job, running it, and grabbing another job until it’s interrupted. Not much going on, but I like how interruption is disabled while the job is actually running and enabled while waiting for a job to be available.

void YkdWorkerThread::YkdWorkerThreadMain()
{
	for ( ;; )
	{
		// Grab a new job. This is a blocking call.
		YkdJobPtr job = YkdJobQueue::getSingletonPtr()->getJob();
 
		// Don't allow interruption until the job is done.
		boost::this_thread::disable_interruption di;
 
		// Run the job
		job();
	}
}
Written by ダニエル氏 in: University |

No Comments »

RSS feed for comments on this post. TrackBack URL


Leave a Reply

Powered by WordPress | Theme: Aeros 2.0 by TheBuckmaker.com