Digital Garden
Computer Science
Concurrent & Parallel Programming
Condition Variables

Condition Variables

Condition variables are another mechanism for synchronizing a program. Condition variables allow threads to enter the waiting state (stop running) until they are signaled/notified by another thread that some condition maybe have been fulfilled, and they can take over. The most common example used to illustrate this is a carpark. When the carpark is full you have to wait until a car drives out, and it is no longer full. Once this happens you want to be notified that the carpark is no longer full, so you can enter the carpark.

public class CarPark {
    private int spaces;
    public CarPark(int spaces) { this.spaces = spaces; }
    public synchronized void enter() {
        while(spaces == 0) {
            try { this.wait(); } // wait and releases lock
            catch (InterruptedException e) { }
        }
        spaces--;
    }
    public synchronized void exit() {
        spaces++;
        this.notifyAll(); // wakes up all threads for race to get the lock
    }
}

Important is that the wait and notify/notifyAll functions are called on the lock object. Because every object can be a lock object the functions are implemented in the Object class. Another important thing to note is that when the wait function is called the lock is released so that other threads can still do work. When the thread acquires the lock again it continues from where it was waiting.

Warning

Make sure to use a while loop, because of interrupts or spurious wakeups.

Notify vs NotifyAll

The notify() function wakes up one waiting thread by random selection, which might still have to compete for the lock. If there are no threads waiting then the notify function is just like an empty statement.

The notifyAll() function wakes up all the waiting threads which then must compete for the lock.

There are two forms of waiters (waiting threads):

  • Uniform waiters: All waiters are equal (wait for the same condition)
  • One-in, one-out: A notification on the condition variable enables at most one thread to proceed

When you are working with uniform waiters notify() is fine however it is much safer but less efficient to use notifyAll().

BlockingQueue

A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or if you try to enqueue items to it and the queue is already full.

A circular blocking queue.

There are a few ways to implement a thread-safe blocking queue. You can either use three locks, one for each condition and one for the synchronization:

public class Queue {
    private final static int SIZE = 10;
    private Object[] buf = new Object[SIZE];
    private int tail = 0, head = 0;
 
    private Object notEmpty = new Object();
    private Object notFull = new Object();
 
    public synchronized Object dequeue() {
        while (tail == head) { // while empty
            synchronized (notEmpty) {
                try { notEmpty.wait(); } catch (Exception e) {}
            }
        }
        synchronized (notFull) { notFull.notify(); }
        Object e = buf[head]; head = (head + 1) % SIZE;
        return e;
    }
    public synchronized void enqueue(Object c) {
        while ((tail + 1) % SIZE == head) {
            synchronized (notFull) {
                try { notFull.wait(); } catch (Exception e) {}
            }
        }
        synchronized (notEmpty) { notEmpty.notify(); }
        buf[tail] = c;
        tail = (tail + 1) % SIZE;
    }
}

Or when working with the Lock interface we can add conditions to the lock:

public class Queue {
    private final static int SIZE = 10;
    private final Object[] buf = new Object[SIZE];
    private int tail = 0, head = 0;
 
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();
 
    public Object dequeue() {
        lock.lock();
        try {
            while (tail == head) { // while empty
                try { notEmpty.await(); } catch (Exception e) {}
            }
            Object e = buf[head]; head = (head + 1) % SIZE;
            notFull.signal(); return e;
        } finally { lock.unlock(); }
    }
    public void enqueue(Object c) {
        lock.lock();
        try {
            while ((tail + 1) % SIZE == head) {
                try { notFull.await(); } catch (Exception e) {}
            }
            buf[tail] = c; tail = (tail + 1) % SIZE;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
}