I am a beginner when it comes to multi-threading. I am trying to understand how Thread in ruby works, and saw this example from this website (Thread-safe Data Structures)
require 'thread'
queue = Queue.new
producer = Thread.new do
10.times do
queue.push(Time.now.to_i)
sleep 1
end
end
consumers = []
3.times do
consumers << Thread.new do
loop do
unix_timestamp = queue.pop
formatted_timestamp = unix_timestamp.to_s.reverse.
gsub(/(\d\d\d)/, '\1,').reverse
puts "It's been #{formatted_timestamp} seconds since the epoch!"
end
end
end
producer.join
The part that I don't understand is the use of loop do
inside the last Thread.new do
consumers << Thread.new do
loop do
...
end
end
When I run the code above, I get
ruby thread_safe.rb
It's been 1,680,277,028 seconds since the epoch!
It's been 1,680,277,029 seconds since the epoch!
It's been 1,680,277,030 seconds since the epoch!
It's been 1,680,277,031 seconds since the epoch!
It's been 1,680,277,032 seconds since the epoch!
It's been 1,680,277,033 seconds since the epoch!
It's been 1,680,277,034 seconds since the epoch!
It's been 1,680,277,035 seconds since the epoch!
If I remove the loop, I only see it printed 3x:
❯ ruby thread_safe.rb
It's been 1,680,277,037 seconds since the epoch!
It's been 1,680,277,038 seconds since the epoch!
It's been 1,680,277,039 seconds since the epoch!
I understand that Ruby's Queue
implements a thread-safe data structure (akin to wrapping a mutex.synchronize
around an array), and that queue will wait
until it has something before it pops it.
My initial expectation is to see the text "It's been X seconds since the epoch!" printed 3x, because we are spawning 3 threads and in each thread we pop the queue once. But somehow adding the loop pops and prints it 10x.
I have two questions:
loop
block caused the queue to be popped a total of 10 times?loop
inside Thread.new
? (I've seen the loop
block used inside Thread.new
in a few places like Ruby Multithreading(TutorialsPoint), Thread loop in Ruby (Stackoverflow), and the code that I referenced in the question, Thread-safe Data Structures, that I begin to notice a pattern. If using loop
is the convention in multi-thread code, why is that?)why is it that wrapping the loop block caused the queue to be popped a total of 10 times?
Each of the consumer threads executes an infinite loop, reading messages from the queue as they arrive.
They collectively read 10 messages because 10 messages were placed in the queue. They would collectively read 20 messages if 20 messages were placed in the queue.
How many messages each consumer thread receives is unknown. It depends on how fast the thread can process them compared to the rate at which they produced. And if multiple threads are waiting for work (as constantly happens here), then one of the threads "at random" will receive the message.
A more generic question: when dealing with multi-threaded environment, is it the convention to use loop inside Thread.new?
This is called the worker model. It allows a single thread to be used to process multiple jobs. This avoids the overheard of creating a new thread for each job. It also naturally caps the number of threads.
The code in the OP is missing one detail however. It needs to exit if no jobs are available and no jobs will ever become available again.
Right now, the program exits the moment the producer is done. This is a bug, as it can leave some jobs unprocessed or incompletely processed. To fix this, you should also join the consumers. But for that to happen, the consumers can't use an infinite loop. This is easy to solve by adding queue.close
outside the loop in the producer, and handling that accordingly in the consumers.
So three changes are needed:
queue.close
to the producer.I don't know ruby, so the implementation is left to you.