Thread Management with Mutexes and Semaphores


One of the most common patterns I've run into while working on PiBox has been threads that process inbound data via a queue.  The pattern is simple enough.

  1. Initialize a thread to acquire data
    1. When data arrives, put it into a data structure
    2. Place the data structure on a queue
    3. Notify another thread to process queued data
  2. Initialize a thread to process data
    1. Wait for data on the queue

linux process threads can share data access but can't, without help, guarantee data integrity.  This means that one thread may be in the process of updating data while another is reading it.  Typically data integrity of a shared data area is provided through the use of a mutex (mutual exclusion lock).  This is fine for many cases but the lock just stops a thread from accessing the data while another thread accesses it.  What this doesn't handle is synchronization of data availability. 

For example, if thread 1 locks access to a link list to add a record and then releases the lock that doesn't tell a separate thread how many links are there to be processed.  So the second thread spins trying to pull links from the list.  If the first thread has not put any links on the list then the second thread just spins again, very fast (unless you use some artificial delay).  This is wasteful because it's not synchronized.  The second thread only wants to process the list if there is something on the list.  The first thread has to tell the second thread that there is work to be done.

There are clever ways to do this with just mutexs where you lock both the list and a counter.  But there is a simpler way of dealing with the problem.  A way that doesn't even require checking the value of the counter directly: a semaphore

A semaphore is a communications mechanism between threads.  One thread updating the semaphore (effectively adding one to the counter) is mutually exclusive to another thread reading the semaphore.  Reading the semaphore is effectively like having the second thread asking is there any more data I need to handle?  If not, the second thread can block until there is data, which happens when the first thread posts to the semaphore. 

The semaphore is similar to polling on a file descriptor where the poll blocks until there is data to be read.  With a semaphore the blocking happens until the semaphore has a value greater than zero.  Breaking the block simply requires posting to the semaphore, which increases its value.

All of this is easier to show with code.  First, we want code for adding a link to a list and code for removing a link.  Think of the first as a thread reading data from the network.  It takes the inbound data and places it in a structure that it then adds to the link list.  This code locks access to the list and then adds data to it.

pthread_mutex_lock(queueMutex);

queueData(rawData);

pthread_mutex_unlock(queueMutex);

This code guarantees that adding the link will complete before another thread can check the link list.  The second thread can get the link in a similar manner.

while( 1 ) {

    pthread_mutex_lock(queueMutex);

    data_link  = popData();

    pthread_mutex_unlock(queueMutex);

}

The mutexes prevent the two threads from stepping on each other (and in reality the lock/unlock would actually happen in the popData() and queueData() functions but doing it as shown here doesn't require writing the functions), a problem that gets bad when the link list has no members.  But the second thread still needs to know when to try and pop data from the list otherwise the second thread will spin very fast and eat up CPU cycles.  The first thread therefore needs to post a semaphore, which bumps the semaphore count by one.

pthread_mutex_lock(queueMutex);

queueData(rawData);

pthread_mutex_unlock(queueMutex);

sem_post(&dataSem);

Remember that a semaphore call is effectively wrapped by a mutex preventing two threads from accessing the semaphore at the same time.  The second thread now just needs to test the semaphore at the top of each loop.

while( 1 ) {

    sem_wait(&dataSem);

    pthread_mutex_lock(queueMutex);

    data_link  = popData();

    pthread_mutex_unlock(queueMutex);

}

Now the second thread blocks on the semaphore until has a value of least 1.  When the semaphore has at least one, the wait returns and decrements that semaphore by one.  Now there is synchronization between the two threads.  The first thread can queue up as many links as it wants and the second will drain that list as long as there is at least one link.  And the second thread doesn't need to take up a lot of CPU cycles by spinning really fast when there is no data on the list.  It only runs when there IS some data on the list!

Now, about that while(1) statement:  This code is inside the function spawned by the thread creation.  So how do we exit the loop?  Well, when the code that wants to shutdown needs to kill off that thread it can set a variable and post to the semaphore. So now that loop would look more like this.

while( isActive() ) {

    sem_wait(&dataSem);

    if ( !isActive() )

        break;

    pthread_mutex_lock(queueMutex);

    data_link  = popData();

    pthread_mutex_unlock(queueMutex);

}

The test of isActive is placed inside a thread-safe function that wraps reading and writing of the variable with a mutex as well.  And the code that kills the thread just looks something like this.

setActive(FALSE);

sem_post(&dataSem);

You might wonder why the semaphore was used for the link list and for the thread loop.  Aren't those unrelated?  Why yes, they are.  The semaphore doesn't have any association with the link list.  It's just a counter we're using to keep track of adding and removing link entries.  But it's a special counter because it's thread-safe:  adding to it and taking from it is guaranteed to be safe from any thread.  And it can be used to block a read, one that wants to wait for some work to do.  The semaphore a communication mechanism between the thread requesting work to be done and the thread that needs to do the work. 

A more expansive example can be found in the piboxd daemon's queueProcessor and msgQueue modules.  The latter is used to manage link lists of data while the latter is used to processes data queued on one of those lists.  The queueProcessor thread loop waits on a semaphore that is updated by the msgQueue functions when data is added to a list.

So you see, dealing with threads is not just about locking access to data with mutexes.  It's also about communicating the changes to data via semaphores

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.