Semaphores and Producer-Consumer
[!NOTE] This module explores the core principles of Semaphores and Producer-Consumer, deriving solutions from first principles and hardware constraints to build world-class, production-ready expertise.
1. Beyond Mutual Exclusion
Mutexes are great for protecting critical sections, but they are limited. What if we want to coordinate threads? For example:
- “Thread A must finish task X before Thread B starts task Y.”
- “Allow at most 5 threads to access the database.”
Enter the Semaphore, invented by Edsger W. Dijkstra.
The Definition
A semaphore is an integer variable that, apart from initialization, is accessed only through two standard atomic operations: P and V.
- P(s) (Proberen / Test / Wait):
- If
s > 0, decrementsand continue. - If
s == 0, wait (block) untils > 0.
- If
- V(s) (Verhogen / Increment / Signal):
- Increment
s. - If any threads were blocked on
P, wake one up.
- Increment
Binary vs Counting
- Binary Semaphore (initialized to 1): Acts exactly like a Mutex.
- Counting Semaphore (initialized to N): Used to control access to a finite resource pool (e.g., Connection Pool).
2. The Producer-Consumer Problem
This is the classic concurrency problem. We have a fixed-size buffer.
- Producer: Generates data and puts it into the buffer. Must wait if buffer is Full.
- Consumer: Takes data out of the buffer. Must wait if buffer is Empty.
Solution with Semaphores
We need three semaphores:
empty: Initialized to N (Size of buffer). tracks empty slots.full: Initialized to 0. Tracks filled slots.mutex: Initialized to 1. Protects buffer access.
3. Interactive: Ring Buffer Visualizer
Visualize how a Producer and Consumer coordinate over a fixed-size buffer (Size = 5). Notice how the Producer blocks when the buffer is full, and the Consumer blocks when it is empty.
4. Code Example: Producer-Consumer
In Go, we don’t usually use raw semaphores. We use Buffered Channels, which act exactly like a semaphore-protected buffer.
import java.util.concurrent.Semaphore;
class BoundedBuffer {
final Semaphore empty;
final Semaphore full;
final Semaphore mutex;
final Object[] items;
int in = 0, out = 0;
public BoundedBuffer(int size) {
empty = new Semaphore(size);
full = new Semaphore(0);
mutex = new Semaphore(1);
items = new Object[size];
}
public void produce(Object item) throws InterruptedException {
empty.acquire(); // Wait for empty slot
mutex.acquire(); // Enter Critical Section
items[in] = item;
in = (in + 1) % items.length;
mutex.release(); // Exit Critical Section
full.release(); // Signal consumer
}
public Object consume() throws InterruptedException {
full.acquire(); // Wait for item
mutex.acquire();
Object item = items[out];
out = (out + 1) % items.length;
mutex.release();
empty.release(); // Signal producer
return item;
}
}
package main
import "fmt"
func main() {
// A buffered channel acts as the buffer + semaphores!
// Capacity = 5 (Semaphore Initial Value)
buffer := make(chan int, 5)
// Producer
go func() {
for i := 0; i < 10; i++ {
buffer <- i // Blocks if buffer is FULL
fmt.Println("Produced:", i)
}
close(buffer)
}()
// Consumer
for item := range buffer { // Blocks if buffer is EMPTY
fmt.Println("Consumed:", item)
}
}