Search code examples
rubylambdacallbackrails-activejob

Passing success and failure handlers to an ActiveJob


I have an ActiveJob that's supposed to load a piece of data from an external system over HTTP. When that job completes, I want to queue a second job that does some postprocessing and then submits the data to a different external system.

I don't want the first job to know about the second job, because

  1. encapsulation
  2. reusability
  3. it's none of the first job's business, basically

Likewise, I don't want the first job to care what happens next if the data-loading fails -- maybe the user gets notified, maybe we retry after a timeout, maybe we just log it and throw up our hands -- again it could vary based on the details of the exception, and there's no need for the job to include the logic for that or the connections to other systems to handle it.

In Java (which is where I have the most experience), I could use something like Guava's ListenableFuture to add success and failure callbacks after the fact:

MyDataLoader loader = new MyDataLoader(someDataSource)
ListenableFuture<Data> future = executor.submit(loader);
Futures.addCallback(future, new FutureCallback<Data>() {
    public void onSuccess(Data result) {
        processData(result);
    }
    public void onFailure(Throwable t) {
        handleFailure(t);
    }
});

ActiveJob, though, doesn't seem to provide this sort of external callback mechanism -- as best I can make out from relevant sections in "Active Job Basics", after_perform and rescue_from are only meant to be called from within the job class. And after_peform isn't meant to distinguish between success and failure.

So the best I've been able to come up with (and I'm not claiming it's very good) is to pass a couple of lambdas into the job's perform method, thus:

class MyRecordLoader < ActiveJob::Base

  # Loads data expensively (hopefully on a background queue) and passes
  # the result, or any exception, to the appropriate specified lambda.
  #
  # @param data_source [String] the URL to load data from
  # @param on_success [-> (String)] A lambda that will be passed the record
  #   data, if it's loaded successfully
  # @param on_failure [-> (Exception)] A lambda that will be passed any
  #   exception, if there is one
  def perform(data_source, on_success, on_failure)
    begin
      result = load_data_expensively_from data_source
      on_success.call(result)
    rescue => exception
      on_failure.call(exception)
    end
  end

end

(Side note: I have no idea what the yardoc syntax is for declaring lambdas as parameters. Does this look correct, or, failing that, plausible?)

The caller would then have to pass these in:

MyRecordLoader.perform_later(
  some_data_source,
  method(:process_data),
  method(:handle_failure)
)

That's not terrible, at least on the calling side, but it seems clunky, and I can't help but suspect there's a common pattern for this that I'm just not finding. And I'm somewhat concerned that, as a Ruby/Rails novice, I'm just bending ActiveJob to do something it was never meant to do in the first place. All the ActiveJob examples I'm finding are 'fire and forget' -- asynchronously "returning" a result doesn't seem to be an ActiveJob use case.

Also, it's not clear to me that this will work at all in the case of a back-end like Resque that runs the jobs in a separate process.

What's "the Ruby way" to do this?


Update: As hinted at by dre-hh, ActiveJob turned out not to be the right tool here. It was also unreliable, and overcomplicated for the situation. I switched to Concurrent Ruby instead, which fits the use case better, and which, since the tasks are mostly IO-bound, is fast enough even on MRI, despite the GIL.

Update (2023-04-19): For what it's worth to anyone else running across this question, I ran into similar requirements in a situation where I needed long-running background jobs that could pick up again after a server redeploy or similar, and since we were using PostgreSQL as our database back end, I was able to achieve this with ActiveJob+GoodJob's batch support.


Solution

  • ActiveJob is not an async Library like a future or promise.

    It is just an interface for performing tasks in a background. The current thread/process receives no result of this operation.

    For example when using Sidekiq as ActiveJob queue, it will serialize the parameters of the perform method into the redis store. Another daemon process running within the context of your rails app will be watching the redis queue and instantiate your worker with the serialized data.

    So passing callbacks might be alright, however why having them as methods on another class. Passing callbacks would make sense if those are dynamic (changing on different invocation). However as you have them implemented on the calling class, consider just moving those methods into your job worker class.