Search code examples
rubymultithreadingsynchronizationmutexmonitor

Ruby synchronisation: How to make threads work one after another in proper order?


My problem is that I don't know how synchronise multiple threads using Ruby. The task is to create six threads and start them immediately. All of them should do some work (for example puts "Thread 1" Hi") one after another in the order I need it to work.

I've tried to work with Mutex, Monitor and Condition Variable, but all of them worked in random order. Could anybody explain how to achieve my goal?

After some time of struggling with Mutex and Condition Variable I've achieved my goal. This code is a little bit messy, and I intentionally did't use cycles for "clearer view".

cv = ConditionVariable.new
mutex = Mutex.new

mutex2 = Mutex.new
cv2 = ConditionVariable.new

mutex3 = Mutex.new
cv3 = ConditionVariable.new

mutex4 = Mutex.new
cv4 = ConditionVariable.new

mutex5 = Mutex.new
cv5 = ConditionVariable.new

mutex6 = Mutex.new
cv6 = ConditionVariable.new



Thread.new do
  mutex.synchronize {
    puts 'First: Hi'
    cv.wait(mutex)
    puts 'First: Bye'
    #cv.wait(mutex)
    cv.signal
    puts 'First: One more time'
  }

end

Thread.new do
  mutex.synchronize {
    puts 'Second: Hi'
    cv.signal
    cv.wait(mutex)
    puts 'Second:Bye'
    cv.signal
  }

  mutex2.synchronize {
    puts  'Second: Starting third'
    cv2.signal

  }
end

Thread.new do
  mutex2.synchronize {
    cv2.wait(mutex2)
    puts 'Third: Hi'
  }

  mutex3.synchronize {
    puts 'Third: Starting forth'
    cv3.signal
  }
end

Thread.new do
  mutex3.synchronize {
    cv3.wait(mutex3)
    puts 'Forth: Hi'
  }

  mutex4.synchronize {
    puts 'Forth: Starting fifth'
    cv4.signal
  }
end

Thread.new do
  mutex4.synchronize {
    cv4.wait(mutex4)
    puts 'Fifth: Hi'
  }

  mutex5.synchronize {
    puts 'Fifth: Starting sixth'
    cv5.signal
  }
end

Thread.new {
  mutex5.synchronize {
    cv5.wait(mutex5)
    puts 'Sixth:Hi'
  }
}

sleep 2

Solution

  • Using Queue as a PV Semaphore

    You can abuse Queue, using it like a traditional PV Semaphore. To do this, you create an instance of Queue:

    require 'thread'
    ...
    sem = Queue.new
    

    When a thread needs to wait, it calls Queue#deq:

    # waiting thread
    sem.deq
    

    When some other thread wants to unblock the waiting thread, it pushes something (anything) onto the queue:

    # another thread that wants to unblock the waiting thread
    sem.enq :go
    

    A Worker class

    Here's a worker class that uses Queue to synchronize its start and stop:

    class Worker
    
      def initialize(worker_number)  
        @start = Queue.new
        Thread.new do
          @start.deq
          puts "Thread #{worker_number}"
          @when_done.call
        end
      end
    
      def start
        @start.enq :start
      end
    
      def when_done(&block)
        @when_done = block
      end
    
    end
    

    When constructed, a worker creates a thread, but that thread then waits on the @start queue. Not until #start is called will the thread unblock.

    When done, the thread will execute the block that was called to #when_done. We'll see how this is used in just a moment.

    Creating workers

    First, let's make sure that if any threads raise an exception, we get to find out about it:

    Thread.abort_on_exception = true
    

    We'll need six workers:

    workers = (1..6).map { |i| Worker.new(i) }
    

    Telling each worker what to do when it's done

    Here's where #when_done comes into play:

    workers.each_cons(2) do |w1, w2|
      w1.when_done { w2.start }
    end
    

    This takes each pair of workers in turn. Each worker except the last is told, that when it finishes, it should start the worker after it. That just leaves the last worker. When it finishes, we want it to notify this thread:

    all_done = Queue.new
    workers.last.when_done { all_done.enq :done }
    

    Let's Go!

    Now all that remains is to start the first thread:

    workers.first.start
    

    and wait for the last thread to finish:

    all_done.deq
    

    The output:

    Thread 1
    Thread 2
    Thread 3
    Thread 4
    Thread 5
    Thread 6