Abstract

In this rather longish mail I show two programs which either show a bug, a
feature, or author's misunderstanding of the condition variables. In the end
I propose few enhancements to current implementation.

The full story

How one should be using condition variables in concurrent programming? Few
examples that I don't understand what's going on, and probably highlight the
lack of concurrent programming knowledge of the author of this mail.

1)
I thought one thread (B) could wait for the variable and the other (A)
thread could signal when B could continue. Actually
ConditionVariable#broadcast made me think there could be unlimited amount of
waiters, which would be signalled to continue like this:

Notation:
\w   thread
 *   creation of a thread
 |   execution of a thread
 =   wakeup of a thread
 #   termination of a thread
   

      A        B         C        D
               *         *        *
               |         |        |
            cv.wait      |        |
                      cv.wait     |
                               cv.wait
     ...
      *
      |
 cv.broadcast
      |        =         =        =
      |        |         |        |
      |        #         |        |
      |                  #        |
      |                           #
      #

I'm suprised, however, by the following program. It creates threads B...
which immediately start to wait. I'd expect all threads will wait until the
cv is signalled, which won't happen, or until the program terminates.
Instead the waiting ends unexpectedly for every second thread.

  require "thread"

  Thread.abort_on_exception = true

  def test_cv(th_count)
    cv = ConditionVariable.new()
    cm = Mutex.new
    cm.lock

    threads = []
    th_count.times do |i|
      th = Thread.start do
        i_local = i
        puts "#{i_local} waiting"
        cv.wait(cm)
        puts "#{i_local} waiting ends"
      end
      threads << th
    end

    puts "sleeping"
    sleep 2

    puts "cv.broadcast"
    cv.broadcast

    puts "final sleep"
    sleep 2

    puts "killing threads"
    threads.each { |th| th.exit }
    puts "threads killed", ""
  end

  10.times do |th_count| 
    test_cv(th_count)
  end

I won't show the entire log, but the first 4 iterations which show the
points anyway:

  ruby 1.6.1 (2000-10-02) [i686-linux]
  sleeping
  cv.broadcast
  final sleep
  killing threads
  threads killed

  0 waiting
  sleeping
  cv.broadcast
  0 waiting ends
  final sleep
  killing threads
  threads killed

  0 waiting
  1 waiting
  1 waiting ends
  sleeping
  cv.broadcast
  final sleep
  killing threads
  threads killed

  0 waiting
  1 waiting
  1 waiting ends
  2 waiting
  sleeping
  cv.broadcast
  0 waiting ends
  final sleep
  killing threads
  threads killed

2)
Then my second problem. I think the current ConditionVariable is inadequate
for anything but the most basic needs. For example if one wants to use it to
create synchronization points there will be surprises.

This example is borrowed from Dave&Andy's article about Mutual exclusions at
http://dev.rubycentral.com/articles/mutex.html. I modified it a bit to let
the "feature" show up. The trick is to get thread B's signal execute before
A will wait. For that I added couple lines of code to expose threads for
context switching, and run the test many times to be captured in the
deadlock eventually.

 require 'thread'

 Thread.abort_on_exception = true

 def do_it
  mutex = Mutex.new
  cv = ConditionVariable.new


  a = Thread.new {
    c = rand(10)
    d = c
    mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
    }
  }

  puts "(Later, back at the ranch...)"

  b= Thread.new {
    c = rand(10)
    d = c
    mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
    }
  }

  a.join
  b.join
 end

 loop { puts ""; do_it }


The logs from different platforms:

  ruby 1.6.1 (2000-10-02) [i686-linux]
  ...
  (Later, back at the ranch...)
  B: Now I am critical, but am done with cv
  B: I am still critical, finishing up
  A: I have critical section, but will wait for cv
  deadlock 0x401775fc: 2:0  - /home/niemela/lib/ruby/1.6/thread.rb:114
  deadlock 0x401835a0: 2:8 (main) - cv_wait_deadlock.rb:32
  cv_wait_deadlock.rb:32:in `join': Thread: deadlock (fatal)
          from cv_wait_deadlock.rb:32:in `do_it'
          from cv_wait_deadlock.rb:36
          from cv_wait_deadlock.rb:36:in `loop'
          from cv_wait_deadlock.rb:36


  ruby 1.4.5 (2000-06-26) [i386-cygwin]
  ...
  (Later, back at the ranch...)
  B: Now I am critical, but am done with cv
  B: I am still critical, finishing up
  A: I have critical section, but will wait for cv
  lib/thread.rb:114:deadlock 0xa052f40: 1:0
  cv_wait_deadlock.rb:32:deadlock 0xa076028: 1:4 (main)
  cv_wait_deadlock.rb:32:in `do_it': Thread: deadlock (fatal)
          from cv_wait_deadlock.rb:36
          from cv_wait_deadlock.rb:36:in `loop'
          from cv_wait_deadlock.rb:36

===
Lastly I have few ideas for the future versions. 

* I'd like to see a version of CV where signalling is queued 
  (optionally) so that cv.signal followed by later cv.wait 
  would not get lost. This solves the bug case 2 might be suffer.
* The first solution could be extended. The CV would count 
  signals, and CV#wait_for(n) is introduced. That would allow 
  one to write code like next example easily.

        A                B           C
    cv.wait_for(2)       |           |
                      cv.signal      |
                         |           |
        =                |        cv.signal
        |                |           |

   I'm not sure how CV#broadcast should be done. Maybe it should
   release all waiters.

   Second extension could be for CV#signal(n) to pass a integer with
   approximately same semantics as n.times{cv.signal} (done atomicly). 


	- Aleksi