Search code examples
rubymultithreadingloopsqueue

What's the use of having a loop block inside Thread.new in ruby?


I am a beginner when it comes to multi-threading. I am trying to understand how Thread in ruby works, and saw this example from this website (Thread-safe Data Structures)

require 'thread'

queue = Queue.new

producer = Thread.new do
  10.times do
    queue.push(Time.now.to_i)
    sleep 1
  end
end

consumers = []

3.times do
  consumers << Thread.new do
    loop do
      unix_timestamp = queue.pop
      formatted_timestamp = unix_timestamp.to_s.reverse.
                            gsub(/(\d\d\d)/, '\1,').reverse

      puts "It's been #{formatted_timestamp} seconds since the epoch!"
    end
  end
end

producer.join

The part that I don't understand is the use of loop do inside the last Thread.new do

consumers << Thread.new do
  loop do
    ...
  end
end

When I run the code above, I get

ruby thread_safe.rb
It's been 1,680,277,028 seconds since the epoch!
It's been 1,680,277,029 seconds since the epoch!
It's been 1,680,277,030 seconds since the epoch!
It's been 1,680,277,031 seconds since the epoch!
It's been 1,680,277,032 seconds since the epoch!
It's been 1,680,277,033 seconds since the epoch!
It's been 1,680,277,034 seconds since the epoch!
It's been 1,680,277,035 seconds since the epoch!

If I remove the loop, I only see it printed 3x:

❯ ruby thread_safe.rb
It's been 1,680,277,037 seconds since the epoch!
It's been 1,680,277,038 seconds since the epoch!
It's been 1,680,277,039 seconds since the epoch!

I understand that Ruby's Queue implements a thread-safe data structure (akin to wrapping a mutex.synchronize around an array), and that queue will wait until it has something before it pops it.

My initial expectation is to see the text "It's been X seconds since the epoch!" printed 3x, because we are spawning 3 threads and in each thread we pop the queue once. But somehow adding the loop pops and prints it 10x.

I have two questions:

  • A question specific to this code example: why is it that wrapping the loop block caused the queue to be popped a total of 10 times?
  • A more generic question: when dealing with multi-threaded environment, is it the convention to use loop inside Thread.new? (I've seen the loop block used inside Thread.new in a few places like Ruby Multithreading(TutorialsPoint), Thread loop in Ruby (Stackoverflow), and the code that I referenced in the question, Thread-safe Data Structures, that I begin to notice a pattern. If using loop is the convention in multi-thread code, why is that?)

Solution

  • why is it that wrapping the loop block caused the queue to be popped a total of 10 times?

    Each of the consumer threads executes an infinite loop, reading messages from the queue as they arrive.

    They collectively read 10 messages because 10 messages were placed in the queue. They would collectively read 20 messages if 20 messages were placed in the queue.

    How many messages each consumer thread receives is unknown. It depends on how fast the thread can process them compared to the rate at which they produced. And if multiple threads are waiting for work (as constantly happens here), then one of the threads "at random" will receive the message.


    A more generic question: when dealing with multi-threaded environment, is it the convention to use loop inside Thread.new?

    This is called the worker model. It allows a single thread to be used to process multiple jobs. This avoids the overheard of creating a new thread for each job. It also naturally caps the number of threads.

    The code in the OP is missing one detail however. It needs to exit if no jobs are available and no jobs will ever become available again.

    Right now, the program exits the moment the producer is done. This is a bug, as it can leave some jobs unprocessed or incompletely processed. To fix this, you should also join the consumers. But for that to happen, the consumers can't use an infinite loop. This is easy to solve by adding queue.close outside the loop in the producer, and handling that accordingly in the consumers.

    So three changes are needed:

    • Add queue.close to the producer.
    • Handle a closed queue in the consumers.
    • Wait for the consumers to complete by joining them.

    I don't know ruby, so the implementation is left to you.