I'm using sidekiq
to deal with async jobs, and after some complexity added, I'm having difficulties to be aware of the state of the jobs.
Here's the deal:
I have a model Batch
that calls an async method after it's commited:
# app/models/batch.rb
class Batch < ActiveRecord::Base
after_commit :calculate, on: :create
def calculate
job_id = BatchWorker.perform_async(self.id)
# update_column skips callbacks and validations!
self.update_column(:job_id, job_id)
end
end
The worker reads data from the model and calls an async job for each data, as follows:
# app/workers/batch_worker.rb
class BatchWorker
def perform(batch_id)
batch = Batch.find(batch_id)
## read data to 'tab'
tab.each do |ts|
obj = batch.item.create(name: ts[0], data: ts[1])
job_id = ItemWorker.perform_async(obj.id)
obj.update_attribute(:job_id, job_id)
end
end
end
The problem is: Those async jobs perform calculations, and I can't allow the download results link be available before it's complete, so I need to know when all "children-jobs" are done, so I can change a status
attribute from the Batch
model. In other words, I don't need to know if all jobs have been queued, but instead, if all async jobs generated by ItemWorker
have been performed, and are now complete.
Obs.: I'm not sure about storing the job_id in db, since it seems to be volatile.
Perhaps using Redis for this could be a good fit, seeing as you already have it in your infrastructure and configured in your Rails app (due to Sidekiq)
Redis has an inbuilt publish/subscribe engine, as well as atomic operations on keys - making it suitable for managing the type of concurrency you are looking for.
Maybe something roughly like this:
class BatchWorker
def perform(batch_id)
batch = Batch.find(batch_id)
redis = Redis.new
redis.set "jobs_remaining_#{batch_id}", tab.count
redis.subscribe("batch_task_complete.#{batch_id}") do |on|
on.message do |event, data|
if redis.decr("jobs_remaining_#{batch_id}") < 1
#UPDATE STATUS HERE
redis.del "jobs_remaining_#{batch_id}"
end
end
end
tab.each do |ts|
obj = batch.item.create(name: ts[0], data: ts[1])
job_id = ItemWorker.perform_async(obj.id, batch_id)
end
end
end
class ItemWorker
def perform item_id, batch_id=nil
#DO STUFF
if batch_id
Redis.new.publish "batch_task_complete.#{batch_id}"
end
end
end