I need to be able to find queued and/or working jobs and/or failed jobs for a model object, say for example when the model object is destroyed we want to find all and either decide not to delete or destroy the jobs (conditionally).
Is there a recommended way to do this before I reinvent the wheel?
Example:
If you want to create a before_destroy
callback that destroys all jobs when the object is destroyed (queued and failed jobs) and only destroy if there are no working jobs
Some pseudo code of what I am thinking to do for this example use case:
Report model
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def destroy_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# destroy/remove all jobs found
(queued_jobs + failed_jobs).each do |job|
# destroy the job here ... commands?
end
end
end
Report processing worker class for resque / redis backed jobs
class ProcessReportWorker
# find the jobs by report id which is one of the arguments for the job?
# envisioned as separate methods so they can be used independently as needed
def self.find_queued_jobs_by_report_id(id)
# parse all jobs in all queues to find based on the report id argument?
end
def self.find_working_jobs_by_report_id(id)
# parse all jobs in working queues to find based on the report id argument?
end
def self.find_failed_jobs_by_report_id(id)
# parse all jobs in failed queue to find based on the report id argument?
end
end
Is this approach on track with what needs to happen?
What are the missing pieces above to find the queued or working jobs by model object id and then destroy it?
Are there already methods in place to find and/or destroy by associated model object id that I have missed in the documentation or my searching?
Update: Revised the usage example to only use working_jobs as a way to check to see if we should delete or not vs suggesting we will try to delete working_jobs also. (because deleting working jobs is more involved than simply removing the redis key entries)
Its been quiet here with no responses so I managed to spend the day tackling this myself following the path I was indicating in my question. There may be a better solution or other methods available but this seems to get the job done so far. Feel free to comment if there are better options here for the methods used or if it can be improved further.
The overall approach here is you need to search through all jobs (queued, working, failed) and filtering out only jobs for the class
and queue
that are relevant and that match the object record id you are looking for in the correct index position of the args array. For example (after confirming the class
and queue
match) if the argument position 0 is where the object id is, then you can test to see if args[0]
matches the object id.
Essentially, a job is associated to the object id if: job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id
queue:#{@queue}
where @queue is the
name of the queue your worker class is using. Modify accordingly by
looping through multiple queues if you are using multiple queues for
a particular worker class. Resque.redis.lrange("queue:#{@queue}",0,-1)
failed
(unless you are using multiple
failure queues or some other than default setup). Resque.redis.lrange("failed",0,-1)
Resque.workers
which contains an array of all workers and the jobs that are running. Resque.workers.map(&:job)
Resque.decode(job)
. class
and args
keys are job["class"]
and job["args"]
. For failed and working jobs these are job["payload"]["class"]
and job["payload"]["args"]
. job["queue"]
. Before testing the args list for the object id, you only want jobs that match the class
and queue
. Your queued jobs list will already be limited to the queue you collected.Below are the example worker class and model methods to find (and to remove) jobs that are associated to the example model object (report).
Report processing worker class for resque / redis backed jobs
class ProcessReportWorker
# queue name
@queue = :report_processing
# tell the worker class where the report id is in the arguments list
REPORT_ID_ARGS_INDEX = 0
# <snip> rest of class, not needed here for this answer
# find jobs methods - find by report id (report is the 'associated' object)
def self.find_queued_jobs_by_report_id report_id
queued_jobs(@queue).select do |job|
is_job_for_report? :queued, job, report_id
end
end
def self.find_failed_jobs_by_report_id report_id
failed_jobs.select do |job|
is_job_for_report? :failed, job, report_id
end
end
def self.find_working_jobs_by_report_id report_id
working_jobs.select do |worker,job|
is_job_for_report? :working, job, report_id
end
end
# association test method - determine if this job is associated
def self.is_job_for_report? state, job, report_id
attributes = job_attributes(state, job)
attributes[:klass] == self.name &&
attributes[:queue] == @queue &&
attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id
end
# remove jobs methods
def self.remove_failed_jobs_by_report_id report_id
find_failed_jobs_by_report_id(report_id).each do |job|
Resque::Failure.remove(job["index"])
end
end
def self.remove_queued_jobs_by_report_id report_id
find_queued_jobs_by_report_id(report_id).each do |job|
Resque::Job.destroy(@queue,job["class"],*job["args"])
end
end
# reusable methods - these methods could go elsewhere and be reusable across worker classes
# job attributes method
def self.job_attributes(state, job)
if state == :queued && job["args"].present?
args = job["args"]
klass = job["class"]
elsif job["payload"] && job["payload"]["args"].present?
args = job["payload"]["args"]
klass = job["payload"]["class"]
else
return {args: nil, klass: nil, queue: nil}
end
{args: args, klass: klass, queue: job["queue"]}
end
# jobs list methods
def self.queued_jobs queue
Resque.redis.lrange("queue:#{queue}", 0, -1)
.collect do |job|
job = Resque.decode(job)
job["queue"] = queue # for consistency only
job
end
end
def self.failed_jobs
Resque.redis.lrange("failed", 0, -1)
.each_with_index.collect do |job,index|
job = Resque.decode(job)
job["index"] = index # required if removing
job
end
end
def self.working_jobs
Resque.workers.zip(Resque.workers.map(&:job))
.reject { |w, j| w.idle? || j['queue'].nil? }
end
end
So then the usage example for Report model becomes
class Report < ActiveRecord::Base
before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs
def check_if_working_jobs
# find all working jobs related to this report object
working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id)
return false unless working_jobs.empty?
end
def remove_queued_and_failed_jobs
# find all jobs related to this report object
queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id)
failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id)
# extra code and conditionals here for example only as all that is really
# needed is to call the remove methods without first finding or checking
unless queued_jobs.empty?
ProcessReportWorker.remove_queued_jobs_by_report_id(self.id)
end
unless failed_jobs.empty?
ProcessReportWorker.remove_failed_jobs_by_report_id(self.id)
end
end
end
The solution needs to be modified if you use multiple queues for the worker class or if you have multiple failure queues. Also, redis
failure backend was used. If a different failure backend is used, changes may be required.