Search code examples
ruby-on-railsbackground-processsidekiq

Change status of an object after a set of async jobs is complete using sidekiq


I'm using sidekiq to deal with async jobs, and after some complexity added, I'm having difficulties to be aware of the state of the jobs.

Here's the deal:

I have a model Batch that calls an async method after it's commited:

# app/models/batch.rb
class Batch < ActiveRecord::Base

  after_commit :calculate, on: :create

  def calculate
    job_id = BatchWorker.perform_async(self.id)

    # update_column skips callbacks and validations!
    self.update_column(:job_id, job_id)
  end
end

The worker reads data from the model and calls an async job for each data, as follows:

# app/workers/batch_worker.rb
class BatchWorker

  def perform(batch_id)
    batch = Batch.find(batch_id)

    ## read data to 'tab'    

    tab.each do |ts|
      obj = batch.item.create(name: ts[0], data: ts[1])
      job_id = ItemWorker.perform_async(obj.id)
      obj.update_attribute(:job_id, job_id)
    end
  end

end

The problem is: Those async jobs perform calculations, and I can't allow the download results link be available before it's complete, so I need to know when all "children-jobs" are done, so I can change a status attribute from the Batch model. In other words, I don't need to know if all jobs have been queued, but instead, if all async jobs generated by ItemWorker have been performed, and are now complete.

  • What would be the best way to attain this? Does it make sense in the "parallel computation world"?

Obs.: I'm not sure about storing the job_id in db, since it seems to be volatile.


Solution

  • Perhaps using Redis for this could be a good fit, seeing as you already have it in your infrastructure and configured in your Rails app (due to Sidekiq)

    Redis has an inbuilt publish/subscribe engine, as well as atomic operations on keys - making it suitable for managing the type of concurrency you are looking for.

    Maybe something roughly like this:

    class BatchWorker
    
      def perform(batch_id)
        batch = Batch.find(batch_id)
    
        redis = Redis.new
        redis.set "jobs_remaining_#{batch_id}", tab.count
        redis.subscribe("batch_task_complete.#{batch_id}") do |on|
          on.message do |event, data|
            if redis.decr("jobs_remaining_#{batch_id}") < 1
              #UPDATE STATUS HERE
              redis.del "jobs_remaining_#{batch_id}"
            end
          end
        end
    
        tab.each do |ts|
          obj = batch.item.create(name: ts[0], data: ts[1])
          job_id = ItemWorker.perform_async(obj.id, batch_id)
        end
      end
    end
    
    class ItemWorker
      def perform item_id, batch_id=nil
        #DO STUFF
        if batch_id
          Redis.new.publish "batch_task_complete.#{batch_id}"
        end
      end
    end