Search code examples
rubyconcurrencypromiseconcurrent-ruby

Chain an array of tasks in Concurrent Ruby


I have a set of tasks that I want to execute sequentially in some background thread, with the result of each task being passed to the next, and with the chain failing if any link in the chain fails.

For the sake of argument, let's say each task is an object with an exec method that returns a value, although they could equally well be procs or lambdas.

What I have now is something like:

promise = array_of_tasks.inject(nil) do |promise, task|
            if promise
              promise.then { |prev_result| task.exec(prev_result) }
            else
              Concurrent::Promise.new { task.exec }
            end
          end

promise.on_success { |last_result| log.info("Success: #{last_result} ")}
promise.rescue { |reason| log.error("Failure: #{reason}")}

Is there a more concise way to do this, either in the Promise API or elsewhere in Concurrent Ruby? It seems like a fairly basic operation, but I'm not seeing an existing method that does it.

(Side note: if there isn't such a method, is there a well-known name for this pattern in the futures-and-promises world? I.e., if I write the method myself, is there some existing obvious name for it?)


Solution

  • It isn't shorter, but this structure might make it easier to add new functionality :

    require 'concurrent'
    
    class Task
      def exec(x = 0)
        sleep 0.1
        p x + 1
      end
    
      alias call exec
    
      def to_promise(*params)
        Concurrent::Promise.new { exec(*params) }
      end
    end
    
    module PromiseChains
      refine Concurrent::Promise do
        def chained_thens(callables)
          callables.inject(self) do |promise, callable|
            promise.then do |prev_result|
              callable.call(prev_result)
            end
          end
        end
      end
    end
    

    It can be used this way :

    using PromiseChains
    
    array_of_tasks = Array.new(10) { Task.new } 
    
    array_of_tasks << ->(x) { p x * 2 } 
    array_of_tasks << proc { |x| p x * 3 }
    
    first_task, *other_tasks = array_of_tasks
    
    chain = first_task.to_promise.chained_thens(other_tasks)
    
    chain.on_success { |last_result| puts "Success: #{last_result} " }
    chain.rescue { |reason| puts "Failure: #{reason}" }
    
    chain.execute
    sleep(2)
    

    It outputs :

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    20
    60
    Success: 60