Wiki

Monitors and Wait Conditions in Qt (Part 2)

by Jasmin Blanchette

In the previous issue of Qt Quarterly, we saw how to synchronize threads using monitors, a high-level synchronization mechanism. We also reviewed the code of two examples, BankAccount and BoundedBuffer, that showed how to implement monitors using QMutex and QWaitCondition. In this issue, we will review a few fundamental techniques for developing more powerful monitors than those presented previously.

More precisely, we will study the following four techniques: covering conditions, "passing the condition", priority wait, and invariants. We will also compare monitors with semaphores and show how they relate to each other. Most of the techniques and examples are drawn from Gregory Andrews's textbook Multithreaded, Parallel, and Distributed Programming (2000).

Covering Conditions

In last quarter's article, we reviewed the following BoundedBuffer monitor, which relies on two wait conditions, bufferIsNotFull and bufferIsNotEmpty:

monitor BoundedBuffer
{
public:
    BoundedBuffer() { head = 0; tail = 0; }
 
    void put(char ch) {
        while (tail == head + N)
            wait(bufferIsNotFull);
        buffer[tail++ % N] = ch;
        signal(bufferIsNotEmpty);
    }
    char get() {
        while (head == tail)
            wait(bufferIsNotEmpty);
        char ch = buffer[head++ % N];
        signal(bufferIsNotFull);
        return ch;
    }
 
private:
    cond bufferIsNotFull;
    cond bufferIsNotEmpty;
    int head, tail;
    char buffer[N];
};

Recall that in our pseudo-code universe, the member functions of a monitor are implicitly executed with mutual exclusion. In Java, this would be implemented using the synchronized keyword. In Qt/C++, we must explicitly lock and unlock a QMutex:

class BoundedBuffer
{
public:
    BoundedBuffer() { head = 0; tail = 0; }
 
    void put(char ch) {
        QMutexLocker locker(&mutex);
        while (tail == head + N)
            bufferIsNotFull.wait(&mutex);
        buffer[tail++ % N] = ch;
        bufferIsNotEmpty.wakeOne();
    }
    char get() {
        QMutexLocker locker(&mutex);
        while (head == tail)
            bufferIsNotEmpty.wait(&mutex);
        char ch = buffer[head++ % N];
        bufferIsNotFull.wakeOne();
        return ch;
    }
 
private:
    QMutex mutex;
    QWaitCondition bufferIsNotFull;
    QWaitCondition bufferIsNotEmpty;
    int head, tail;
    char buffer[N];
};

A covering condition is a wait condition that is used for several purposes. For the BoundedBuffer monitor, we could for example merge the two wait conditions into a single bufferChanged condition, leading to the following pseudo-code:

monitor BoundedBuffer
{
public:
    BoundedBuffer() { head = 0; tail = 0; }
 
    void put(char ch) {
        while (tail == head + N)
            wait(bufferChanged);
        buffer[tail++ % N] = ch;
        signalAll(bufferChanged);
    }
    char get() {
        while (head == tail)
            wait(bufferChanged);
        char ch = buffer[head++ % N];
        signalAll(bufferChanged);
        return ch;
    }
 
private:
    cond bufferChanged;
    int head, tail;
    char buffer[N];
};

In the new version of the monitor, we signal all the waiting threads every time we read from or write to the buffer by calling signalAll(bufferChanged). The waiting threads then recheck the predicate on which they are waiting (tail == head + N or head == tail). One of them will proceed and the other ones will wait.

Both versions of the BoundedBuffer monitor are semantically equivalent, but the first version is faster in practice, because a call to get() or put() will wake at most one thread, instead of waking up all suspended threads.

Why use covering conditions then? In the BoundedBuffer example, we could do without because when n threads try to access the monitor simultaneously, they wait for one of two conditions to occur ("buffer is not empty" and "buffer is not full"). For some applications, using separate wait conditions would not be practical, because the n threads might be waiting for n different conditions. The BankAccount monitor below illustrates this:

monitor BankAccount
{
public:
    BankAccount() { balance = 0; }
 
    void withdraw(uint amount) {
        while (amount > balance)
            wait(moreMoney);
        balance -= amount;
    }
    void deposit(uint amount) {
        balance += amount;
        signalAll(moreMoney);
    }
 
private:
    cond moreMoney;
    int balance;
};

Covering conditions are very common in Java because, prior to version 1.5, Java supported only one wait condition per monitor.

Passing the Condition

The next technique we will review is called "passing the condition." Consider the following MemoryAllocator monitor:

monitor MemoryAllocator
{
public:
    MemoryAllocator() {
        for (int i = 0; i < N; ++i)
            isFree[i] = true;
        numFree = N;
    }
 
    char *allocBlock() {
        if (numFree == 0)
            wait(blockFreed);
        for (int i = 0; i < N; ++i) {
            if (isFree[i]) {
                isFree[i] = false;
                --numFree;
                return blocks[i];
            }
        }
        assert(false);
        return 0;
    }
 
    void freeBlock(char *block) {
        int i = (block - blocks[0]) / BlockSize;
        assert(i >= 0 && i < N && !isFree[i]);
        isFree[i] = true;
        ++numFree;
        signal(blockFreed);
    }
 
private:
    cond blockFreed;
    char blocks[N][BlockSize];
    bool isFree[N];
    int numFree;
};

The MemoryAllocator monitor manages N memory blocks of size BlockSize. The allocBlock() function returns a pointer to a previously unused block, and freeBlock() releases a used block. If allocBlock() is called when no blocks are available (numFree == 0), we wait until a block is freed. The next time freeBlock() is called, the blockFreed condition will be signaled, and allocBlock() will resume.

What essentially happens is that we free a block only to allocate it immediately afterwards. We can optimize the monitor a bit by handing over the newly released block directly to the thread that was trying to allocate it. Here's the pseudo-code:

monitor MemoryAllocator
{
public:
    MemoryAllocator() {
        for (int i = 0; i < N; ++i)
            isFree[i] = true;
        numFree = N;
        numDelayed = 0;
        lastFreed = 0;
    }
 
    char *allocBlock() {
        if (numFree == 0) {
            ++numDelayed;
            wait(blockFreed);
            --numDelayed;
            return lastFreed;
        } else {
            for (int i = 0; i < N; ++i) {
                if (isFree[i]) {
                    isFree[i] = false;
                    --numFree;
                    return blocks[i];
                }
            }
            assert(false);
            return 0;
        }
    }
 
    void freeBlock(char *block) {
        int i = (block - blocks[0]) / BlockSize;
        assert(i >= 0 && i < N && !isFree[i]);
        if (numDelayed > 0) {
            lastFreed = block;
            signal(blockFreed);
        } else {
            isFree[i] = true;
            ++numFree;
        }
    }
 
private:
    cond blockFreed;
    char blocks[N][BlockSize];
    bool isFree[N];
    int numFree;
    int numDelayed;
    char *lastFreed;
};

The numDelayed variable is used to count how many threads are waiting for a free block in allocBlock(). If numDelayed > 0 when freeBlock() is called, we simply signal blockFreed and store the freed block in lastFreed, where it can be picked up by allocBlock().

If allocBlock() is called when numFree == 0, we increment numDelayed and wait until we are signaled. When that happens, we can simply return lastFreed, instead of iterating through all the blocks.

Fairness and Priority Wait

One issue that constantly arises in multithreaded and distributed programs is that of fairness. Fairness is necessary to avoid starvation, which occurs when one thread is delayed indefinitely while other threads are accessing a resource.

Starvation can occur with the MemoryAllocator example above because signal() (or the Qt function QWaitCondition::wakeOne()) can wake up any thread that wakes on the condition---not necessarily the thread that has waited the longest.

With the MemoryAllocator example, starvation is unlikely because the threads are symmetric and real-world implementations of threads tend to be sufficiently random. A more realistic example where starvation rears its ugly head is that of a read-write lock, such as QReadWriteLock. Such a lock allows many readers to access the same data simultaneously, but only one writer. Furthermore, readers and writers may not be active at the same time.

The pseudo-code below shows one way to implement a ReadWriteLock monitor:

monitor ReadWriteLock
{
public:
    ReadWriteLock() { numReaders = 0; numWriters = 0; }
 
    void lockForRead() {
        while (numWriters > 0)
            wait(unlocked);
        ++numReaders;
    }
    void lockForWrite() {
        while (numReaders + numWriters > 0)
            wait(unlocked);
        ++numWriters;
    }
 
    void unlockAfterRead() {
        assert(numReaders > 0);
        --numReaders;
        signalAll(unlocked);
    }
    void unlockAfterWrite() {
        assert(numWriters > 0);
        --numWriters;
        signalAll(unlocked);
    }
 
private:
    cond unlocked;
    int numReaders;
    int numWriters;
};

This monitor can be used to synchronize threads that access a shared resource. For example, a reader thread would call lockForRead() before reading data from the resource and unlockAfterRead() when it is done.

The problem with the above code is that writers are likely to starve if there are many reader threads. Writers can only seize control of the lock when numReaders is 0. It's a bit like being a tourist in Paris in July and expecting to find the Chвteau de Versailles totally empty (and open!).

One standard way to solve this problem (thread starvation, not Versailles) is to let the readers and writers alternate. When a writer is waiting, we don't let any new reader start:

void lockForRead() {
    if (numWriters + delayedWriters > 0) {
        ++delayedReaders;
        do {
            readersMayProceed.wait(&mutex);
        } while (numWriters > 0);
        --delayedReaders;
    }
    ++numReaders;
}

And when other threads are active, we don't let a new writer start:

void lockForWrite() {
    while (numReaders + numWriters > 0) {
        ++delayedWriters;
        checkInvariant();
        oneWriterMayProceed.wait(&mutex);
        checkInvariant();
        --delayedWriters;
    }
    ++numWriters;
}

The unlockAfter...() functions must also be adapted:

void unlockAfterRead() {
    assert(numReaders > 0);
    --numReaders;
    if (numReaders == 0 && delayedWriters > 0)
        signal(oneWriterMayProceed);
}
void unlockAfterWrite() {
    assert(numWriters > 0);
    --numWriters;
    if (delayedReaders > 0) {
        signalAll(readersMayProceed);
    } else if (delayedWriters > 0) {
        signal(oneWriterMayProceed);
    }
}

The new version of the ReadWriteLock monitor ensures that neither the readers nor the writers can monopolize the shared resource. Another way to solve this issue would be to use a priority queue and to enforce a strict "first come, first serve" policy. The queue can be stored explicitly as a QQueue of thread IDs, or implicitly by having the threads take a number, just as you would do when you go to the bakery (at least in Norway). Here's an implementation of the BankAccount monitor using the "take a number" approach:

monitor BankAccount
{
public:
    BankAccount() {
        balance = 0; firstNo = 0; nextNo = 0;
    }
 
    void withdraw(uint amount) {
        int myNo = nextNo++;
        while (firstNo != myNo || amount > balance)
            wait(moreMoney);
        balance -= amount;
        ++firstNo;
    }
    void deposit(uint amount) {
        balance += amount;
        signalAll(moreMoney);
    }
 
private:
    cond moreMoney;
    int balance;
    int firstNo;
    int nextNo;
};

With the "first come, first served" policy, if there is $500 in the account, a thread that requests only $20 might have to wait for one that requested $1000. This might seem unfortunate, but it is often the price to pay to avoid starvation. Using a different data structure, we could implement a different scheduling policy, such as "shortest job next" or "round robin".

Assumptions and Invariants

Multithreaded programming is difficult because there is always the danger that several threads might access the same data structure simultaneously and leave it in an undesirable state. Monitors, with their implicit mutual exclusion, make this less likely.

It may still pay off to add explicit checks in the code to ensure that the data structure is in a consistent state whenever a thread enters or leaves a monitor (including before and after calls to wait()). The concept of a consistent state is typically formulated as a Boolean expression, called an invariant, that should be true whenever a thread enters or leaves the monitor.

Here's the BankAccount code, with calls to the checkInvariant() private function in the appropriate places:

BankAccount() {
    balance = 0;
    §checkInvariant();
}
 
void withdraw(uint amount) {
    §checkInvariant();
    while (amount > balance) {
        §checkInvariant();
        wait(moreMoney);
        §checkInvariant();
    }
    balance -= amount;
    §checkInvariant();
}
 
void deposit(uint amount) {
    §checkInvariant();
    balance += amount;
    signalAll(moreMoney);
    §checkInvariant();
}
 
void checkInvariant() const {
    assert(balance >= 0);
}

For BankAccount, the invariant is trivial, and it is easy to check that it holds.[1] For ReadWriteLock, the invariant is slightly more complex:

void checkInvariant() const {
    assert((numReaders == 0 || numWriters == 0)
           && numWriters <= 1);
}

For MemoryAllocator, we can check that numFree is in sync with the isFree array by iterating over isFree:

void checkInvariant() const {
    int actualNumFree = 0;
    for (int i = 0; i < N; ++i) {
        if (isFree[i])
            ++actualNumFree;
    }
    assert(numFree == actualNumFree);
}

The last invariant is fairly expensive to compute, so it's probably better to leave it out of production code, or to reorganize the data structure so that numFree is no longer necessary.

In a well-written monitor, the invariant should hold whenever a thread enters or leaves the monitor, no matter which monitor functions are called, and when. To avoid entering bad states as a result of calling the methods in the wrong order, it is a good idea to detect these cases and abort. We already used this approach in the ReadWriteLock monitor's unlockAfterRead() and unlockAfterWrite() functions to guard against spurious calls. Aborting may sound like a drastic measure, but it is always preferable to producing incorrect results or corrupting a database.

Monitors versus Semaphores

A semaphore is a low-level synchronization tool that provides two operations: acquire() and release(). A special case of a semaphore is that of a binary semaphore, often called a mutex. The mutex operation lock() corresponds to an acquire(), and unlock() corresponds to release(). General (non-binary) semaphores can be used to guard n identical resources.

For all the problems we have studied so far, we could have come up with a solution that uses only QSemaphore or QMutex. However, by using the higher-level monitor construct, we could focus on the business logic and spend little time thinking about thread synchronization.

To illustrate the fundamental differences between monitors and semaphores, we will study the code of a Semaphore monitor:

monitor Semaphore
{
public:
    Semaphore() { value = 0; }
 
    void release() {
        ++value;
        signal(released);
    }
    void acquire() {
        if (value == 0)
            wait(released);
        --value;
    }
 
private:
    cond released;
    int value;
};

As the example illustrates, there is a strong connection between a semaphore release() and a monitor signal() on the one hand, and between a semaphore acquire() and a monitor wait() on the other. There are, however, two important differences:

  • Calls to release() are "remembered", even when no thread is waiting to acquire the semaphore. In contrast, signal() is effectively a no-op if no thread is waiting on the condition at the time when it is signaled.
  • Calls to acquire() only block if the semaphore's value is 0. In contrast, wait() always blocks.

Incidentally, the above monitor can be rewritten to use the "pass the condition" idiom:

monitor Semaphore
{
public:
    Semaphore() { value = 0; delayed = 0; }
 
    void release() {
        if (delayed == 0)
            ++value;
        else
            signal(released);
    }
    void acquire() {
        if (value == 0) {
            ++delayed;
            wait(released);
            --delayed;
        } else {
            --value;
        }
    }
 
private:
    cond released;
    int value;
    int delayed;
};

On platforms that guarantee a FIFO (first in, first out) ordering when signaling wait conditions, the above monitor implements a FIFO semaphore. Alternatively, we could use an implicit or explicit priority queue to order the pending acquire() calls.

Conclusion

The monitor mechanism was proposed in the early 1970s as a structured solution to the problem of synchronizing threads in an operating system. It went out of fashion during the 1980s and was granted a second life by Java's designers.

Many Qt programs use QMutex to avoid race conditions when accessing shared data. By hiding the data inside a class and throwing a QWaitCondition into the mix, we can obtain finer control over the order in which threads are executed, as we saw with the ReadWriteLock example and the "take a number" version of the BankAccount monitor. And once we have all the code manipulating the shared data structure in a centralized place, it is easy to add run-time checks to ensure that the data structure never enters an undesirable state.

Despite this, it is often a matter of taste whether one should use QSemaphore or QWaitCondition to solve a particular problem. For example, the QReadWriteLock class is implemented in terms of QWaitCondition, whereas the ReadWriteMutex class described in Implementing a Read/Write Mutex by Volker Hilsheimer (Qt Quarterly issue 11) uses a QSemaphore.


[1] Unless someone kindly deposits $2,147,483,648 in our bank account, resulting in an integer wrap around.


Copyright © 2007 Trolltech Trademarks