Search code examples
rubymultithreadingconcurrencythreadpool

Learning Ruby threading - trigger an event when thread finishes


I'm new to multi-threading and I'm looking for some help understanding the idiomatic way of doing something when a thread is finished, such as updating a progress bar. In the following example, I have several lists of items and routines to do some "parsing" of each item. I plan to have a progress bar for each list so I'd like to be able to have each list's parsing routine update the percentage of items completed. The only "trigger" point I see is at the puts statement at the end of an item's sleepy method (the method being threaded). What's the generally accepted strategy for capturing the completion, especially when the scope of the action is outside the method running in the thread?

Thanks!

# frozen_string_literal: true

require 'concurrent'

$stdout.sync = true

class TheList
  attr_reader :items

  def initialize(list_id, n_items)
    @id = list_id
    @items = []
    n_items.times { |n| @items << Item.new(@id, n) }
  end

  def parse_list(pool)
    @items.each do |item|
      pool.post { item.sleepy(rand(3..8)) }
    end
  end
end

class Item
  attr_reader :id

  def initialize (list_id, item_id)
    @id = item_id
    @list_id = list_id
  end

  def sleepy(seconds)
    sleep(seconds)
    # This puts statement signifies the end of the method threaded
    puts "List ID: #{@list_id} item ID:#{@id} slept for #{seconds} seconds"
  end
end

lists = []
5.times do |i|
  lists << TheList.new(i, rand(5..10))
end

pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)

lists.each do |list|
  list.parse_list(pool)
end
pool.shutdown
pool.wait_for_termination

Solution

  • The issue isn't really about "knowing when the thread finished", but rather, how can you update a shared progress bar without race conditions.

    To explain the problem: say you had a central ThreadList#progress_var variable, and as the last line of each thread, you incremented it with +=. This would introduce a race condition because two threads can perform the operation at the same time (and could overwrite each other's results).

    To get around this, the typical approach is to use a Mutex which is an essential concept to understand if you're learning multithreading.

    The actual implementation isn't that difficult:

    require 'mutex'
    
    class ThreadList
      def initialize
        @semaphore = Mutex.new   
        @progress_bar = 0
      end
      def increment_progress_bar(amount)
        @semaphore.synchronize do
          @progress_bar += amount
        end
      end 
    end
    

    Because of that @semaphore.synchronize block, you can now safely call this increment_progress_bar method from threads, without the risk of race condition.