Open main menu
Home
Random
Recent changes
Special pages
Community portal
Preferences
About Wikipedia
Disclaimers
Incubator escapee wiki
Search
User menu
Talk
Dark mode
Contributions
Create account
Log in
Editing
Monitor (synchronization)
(section)
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
====Solving the bounded producer/consumer problem==== {{technical|section|date=January 2014}} Having introduced the usage of condition variables, let us use it to revisit and solve the classic bounded producer/consumer problem. The classic solution is to use two monitors, comprising two condition variables sharing one lock on the queue: <syntaxhighlight lang="cpp"> global volatile RingBuffer queue; // A thread-unsafe ring-buffer of tasks. global Lock queueLock; // A mutex for the ring-buffer of tasks. (Not a spin-lock.) global CV queueEmptyCV; // A condition variable for consumer threads waiting for the queue to // become non-empty. Its associated lock is "queueLock". global CV queueFullCV; // A condition variable for producer threads waiting for the queue to // become non-full. Its associated lock is also "queueLock". // Method representing each producer thread's behavior: public method producer() { while (true) { // Producer makes some new task to be added. task myTask = ...; // Acquire "queueLock" for the initial predicate check. queueLock.acquire(); // Critical section that checks if the queue is non-full. while (queue.isFull()) { // Release "queueLock", enqueue this thread onto "queueFullCV" and sleep this thread. wait(queueLock, queueFullCV); // When this thread is awoken, re-acquire "queueLock" for the next predicate check. } // Critical section that adds the task to the queue (note that we are holding "queueLock"). queue.enqueue(myTask); // Wake up one or all consumer threads that are waiting for the queue to be non-empty // now that it is guaranteed, so that a consumer thread will take the task. signal(queueEmptyCV); // Or: broadcast(queueEmptyCV); // End of critical sections. // Release "queueLock" until we need it again to add the next task. queueLock.release(); } } // Method representing each consumer thread's behavior: public method consumer() { while (true) { // Acquire "queueLock" for the initial predicate check. queueLock.acquire(); // Critical section that checks if the queue is non-empty. while (queue.isEmpty()) { // Release "queueLock", enqueue this thread onto "queueEmptyCV" and sleep this thread. wait(queueLock, queueEmptyCV); // When this thread is awoken, re-acquire "queueLock" for the next predicate check. } // Critical section that takes a task off of the queue (note that we are holding "queueLock"). myTask = queue.dequeue(); // Wake up one or all producer threads that are waiting for the queue to be non-full // now that it is guaranteed, so that a producer thread will add a task. signal(queueFullCV); // Or: broadcast(queueFullCV); // End of critical sections. // Release "queueLock" until we need it again to take the next task. queueLock.release(); // Go off and do something with the task. doStuff(myTask); } } </syntaxhighlight> This ensures concurrency between the producer and consumer threads sharing the task queue, and blocks the threads that have nothing to do rather than busy-waiting as shown in the aforementioned approach using spin-locks. A variant of this solution could use a single condition variable for both producers and consumers, perhaps named "queueFullOrEmptyCV" or "queueSizeChangedCV". In this case, more than one condition is associated with the condition variable, such that the condition variable represents a weaker condition than the conditions being checked by individual threads. The condition variable represents threads that are waiting for the queue to be non-full ''and'' ones waiting for it to be non-empty. However, doing this would require using ''broadcast'' in all the threads using the condition variable and cannot use a regular ''signal''. This is because the regular ''signal'' might wake up a thread of the wrong type whose condition has not yet been met, and that thread would go back to sleep without a thread of the correct type getting signaled. For example, a producer might make the queue full and wake up another producer instead of a consumer, and the woken producer would go back to sleep. In the complementary case, a consumer might make the queue empty and wake up another consumer instead of a producer, and the consumer would go back to sleep. Using ''broadcast'' ensures that some thread of the right type will proceed as expected by the problem statement. Here is the variant using only one condition variable and broadcast: <syntaxhighlight lang="cpp"> global volatile RingBuffer queue; // A thread-unsafe ring-buffer of tasks. global Lock queueLock; // A mutex for the ring-buffer of tasks. (Not a spin-lock.) global CV queueFullOrEmptyCV; // A single condition variable for when the queue is not ready for any thread // i.e. for producer threads waiting for the queue to become non-full // and consumer threads waiting for the queue to become non-empty. // Its associated lock is "queueLock". // Not safe to use regular "signal" because it is associated with // multiple predicate conditions (assertions). // Method representing each producer thread's behavior: public method producer() { while (true) { // Producer makes some new task to be added. task myTask = ...; // Acquire "queueLock" for the initial predicate check. queueLock.acquire(); // Critical section that checks if the queue is non-full. while (queue.isFull()) { // Release "queueLock", enqueue this thread onto "queueFullOrEmptyCV" and sleep this thread. wait(queueLock, queueFullOrEmptyCV); // When this thread is awoken, re-acquire "queueLock" for the next predicate check. } // Critical section that adds the task to the queue (note that we are holding "queueLock"). queue.enqueue(myTask); // Wake up all producer and consumer threads that are waiting for the queue to be respectively // non-full and non-empty now that the latter is guaranteed, so that a consumer thread will take the task. broadcast(queueFullOrEmptyCV); // Do not use "signal" (as it might wake up another producer thread only). // End of critical sections. // Release "queueLock" until we need it again to add the next task. queueLock.release(); } } // Method representing each consumer thread's behavior: public method consumer() { while (true) { // Acquire "queueLock" for the initial predicate check. queueLock.acquire(); // Critical section that checks if the queue is non-empty. while (queue.isEmpty()) { // Release "queueLock", enqueue this thread onto "queueFullOrEmptyCV" and sleep this thread. wait(queueLock, queueFullOrEmptyCV); // When this thread is awoken, re-acquire "queueLock" for the next predicate check. } // Critical section that takes a task off of the queue (note that we are holding "queueLock"). myTask = queue.dequeue(); // Wake up all producer and consumer threads that are waiting for the queue to be respectively // non-full and non-empty now that the former is guaranteed, so that a producer thread will add a task. broadcast(queueFullOrEmptyCV); // Do not use "signal" (as it might wake up another consumer thread only). // End of critical sections. // Release "queueLock" until we need it again to take the next task. queueLock.release(); // Go off and do something with the task. doStuff(myTask); } } </syntaxhighlight>
Edit summary
(Briefly describe your changes)
By publishing changes, you agree to the
Terms of Use
, and you irrevocably agree to release your contribution under the
CC BY-SA 4.0 License
and the
GFDL
. You agree that a hyperlink or URL is sufficient attribution under the Creative Commons license.
Cancel
Editing help
(opens in new window)