I have a set of tasks that I want to execute sequentially in some background thread, with the result of each task being passed to the next, and with the chain failing if any link in the chain fails.
For the sake of argument, let's say each task is an object with an exec
method that returns a value, although they could equally well be procs or lambdas.
What I have now is something like:
promise = array_of_tasks.inject(nil) do |promise, task|
if promise
promise.then { |prev_result| task.exec(prev_result) }
else
Concurrent::Promise.new { task.exec }
end
end
promise.on_success { |last_result| log.info("Success: #{last_result} ")}
promise.rescue { |reason| log.error("Failure: #{reason}")}
Is there a more concise way to do this, either in the Promise
API or elsewhere in Concurrent Ruby? It seems like a fairly basic operation, but I'm not seeing an existing method that does it.
(Side note: if there isn't such a method, is there a well-known name for this pattern in the futures-and-promises world? I.e., if I write the method myself, is there some existing obvious name for it?)
It isn't shorter, but this structure might make it easier to add new functionality :
require 'concurrent'
class Task
def exec(x = 0)
sleep 0.1
p x + 1
end
alias call exec
def to_promise(*params)
Concurrent::Promise.new { exec(*params) }
end
end
module PromiseChains
refine Concurrent::Promise do
def chained_thens(callables)
callables.inject(self) do |promise, callable|
promise.then do |prev_result|
callable.call(prev_result)
end
end
end
end
end
It can be used this way :
using PromiseChains
array_of_tasks = Array.new(10) { Task.new }
array_of_tasks << ->(x) { p x * 2 }
array_of_tasks << proc { |x| p x * 3 }
first_task, *other_tasks = array_of_tasks
chain = first_task.to_promise.chained_thens(other_tasks)
chain.on_success { |last_result| puts "Success: #{last_result} " }
chain.rescue { |reason| puts "Failure: #{reason}" }
chain.execute
sleep(2)
It outputs :
1
2
3
4
5
6
7
8
9
10
20
60
Success: 60