Search code examples
rubymultithreadingconcurrencyconcurrent-ruby

Handle exceptions in concurrent-ruby thread pool


How to handle exceptions in concurrent-ruby thread pools (http://ruby-concurrency.github.io/concurrent-ruby/file.thread_pools.html)?

Example:

pool = Concurrent::FixedThreadPool.new(5) 
pool.post do
  raise 'something goes wrong'
end

# how to rescue this exception here

Update:

Here is simplified version of my code:

def process
  pool = Concurrent::FixedThreadPool.new(5)

  products.each do |product|
    new_product = generate_new_product

    pool.post do
      store_in_db(new_product) # here exception is raised, e.g. connection to db failed
    end
  end

  pool.shutdown
  pool.wait_for_terminaton
end

So what I want to achive, is to stop processing (break loop) in case of any exception.

This exception is also rescued at higher level of application and there are executed some cleaning jobs (like setting state of model to failure and sending some notifications).


Solution

  • The following answer is from jdantonio from here https://github.com/ruby-concurrency/concurrent-ruby/issues/616

    " Most applications should not use thread pools directly. Thread pools are a low-level abstraction meant for internal use. All of the high-level abstractions in this library (Promise, Actor, etc.) all post jobs to the global thread pool and all provide exception handling. Simply pick the abstraction that best fits your use case and use it.

    If you feel the need to configure your own thread pool rather than use the global thread pool, you can still use the high-level abstractions. They all support an :executor option which allows you to inject your custom thread pool. You can then use the exception handling provided by the high-level abstraction.

    If you absolutely insist on posting jobs directly to a thread pool rather than using our high-level abstractions (which I strongly discourage) then just create a job wrapper. You can find examples of job wrappers in all our high-level abstractions, Rails ActiveJob, Sucker Punch, and other libraries which use our thread pools."

    So how about an implementation with Promises ? http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/Promise.html In your case it would look something like this:

    promises = []
    products.each do |product|
      new_product = generate_new_prodcut
    
      promises << Concurrent::Promise.execute do 
        store_in_db(new_product)
      end
    end
    
    # .value will wait for the Thread to finish.
    # The ! means, that all exceptions will be propagated to the main thread
    # .zip will make one Promise which contains all other promises.
    Concurrent::Promise.zip(*promises).value!