Search code examples
rubymutexdeadlockrace-conditionproducer-consumer

How to use condition variables?


There aren't many resources on Condition Variables in Ruby, however most of them are wrong. Like ruby-doc, tutorial here or post here - all of them suffer with possible deadlock.

We could solve the problem by starting threads in given order and maybe putting some sleep in between to enforce synchronization. But that's just postponing the real problem.

I rewrote the code into a classical producer-consumer problem:

require 'thread'
queue = []
mutex = Mutex.new
resource = ConditionVariable.new
threads = []

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do 
      resource.wait(mutex)
      value = queue.pop
      print "consumed #{value}\n"
    end
  end
end

threads << Thread.new do
  5.times do |i|
    mutex.synchronize do
      queue << i
      print "#{i} produced\n"
      resource.signal
    end
    sleep(1) #simulate expense
  end
end

threads.each(&:join)

Sometimes you will get this (but not always):

0 produced
1 produced
consumed 0
2 produced
consumed 1
3 produced
consumed 2
4 produced
consumed 3
producer-consumer.rb:30:in `join': deadlock detected (fatal)
        from producer-consumer.rb:30:in `each'
        from producer-consumer.rb:30:in `<main>'

What is the correct solution?


Solution

  • This is more robust solution with multiple consumers and producers and usage of MonitorMixin, MonitorMixin has a special ConditionVariable with wait_while() and wait_until() methods

    require 'monitor'
    
    queue = []
    queue.extend(MonitorMixin)
    cond = queue.new_cond
    consumers, producers = [], []
    
    for i in 0..5
      consumers << Thread.start(i) do |i|
          print "consumer start #{i}\n"
          while (producers.any?(&:alive?) || !queue.empty?)
            queue.synchronize do
            cond.wait_while { queue.empty? }
            print "consumer #{i}: #{queue.shift}\n"
          end
          sleep(0.2) #simulate expense
        end
      end
    end
    
    for i in 0..3
      producers << Thread.start(i) do |i|
        id = (65+i).chr
        for j in 0..10 do
          queue.synchronize do
            item = "#{j} #{id}"
            queue << item
            print "producer #{id}: produced #{item}\n"
            j += 1
            cond.broadcast
          end
          sleep(0.1) #simulate expense
        end
      end
    end
    
    sleep 0.1 while producers.any?(&:alive?)
    sleep 0.1 while consumers.any?(&:alive?)
    
    print "queue size #{queue.size}\n"