Search code examples
ruby-on-railsrubyredisresque

How to find associated Resque jobs for an ActiveRecord model object?


I need to be able to find queued and/or working jobs and/or failed jobs for a model object, say for example when the model object is destroyed we want to find all and either decide not to delete or destroy the jobs (conditionally).

Is there a recommended way to do this before I reinvent the wheel?

Example:

If you want to create a before_destroy callback that destroys all jobs when the object is destroyed (queued and failed jobs) and only destroy if there are no working jobs

Some pseudo code of what I am thinking to do for this example use case:

Report model

class Report < ActiveRecord::Base
  before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs

  def check_if_working_jobs
    # find all working jobs related to this report object
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty?
  end

  def destroy_queued_and_failed_jobs
    # find all jobs related to this report object
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # destroy/remove all jobs found
    (queued_jobs + failed_jobs).each do |job| 
       # destroy the job here ... commands?
    end

  end
end

Report processing worker class for resque / redis backed jobs

class ProcessReportWorker

  # find the jobs by report id which is one of the arguments for the job?
  # envisioned as separate methods so they can be used independently as needed

  def self.find_queued_jobs_by_report_id(id)
     # parse all jobs in all queues to find based on the report id argument?
  end 

  def self.find_working_jobs_by_report_id(id)
     # parse all jobs in working queues to find based on the report id argument? 
  end 

  def self.find_failed_jobs_by_report_id(id)
     # parse all jobs in failed queue to find based on the report id argument? 
  end 
end

Is this approach on track with what needs to happen?

What are the missing pieces above to find the queued or working jobs by model object id and then destroy it?

Are there already methods in place to find and/or destroy by associated model object id that I have missed in the documentation or my searching?

Update: Revised the usage example to only use working_jobs as a way to check to see if we should delete or not vs suggesting we will try to delete working_jobs also. (because deleting working jobs is more involved than simply removing the redis key entries)


Solution

  • Its been quiet here with no responses so I managed to spend the day tackling this myself following the path I was indicating in my question. There may be a better solution or other methods available but this seems to get the job done so far. Feel free to comment if there are better options here for the methods used or if it can be improved further.

    The overall approach here is you need to search through all jobs (queued, working, failed) and filtering out only jobs for the class and queue that are relevant and that match the object record id you are looking for in the correct index position of the args array. For example (after confirming the class and queue match) if the argument position 0 is where the object id is, then you can test to see if args[0] matches the object id.

    Essentially, a job is associated to the object id if: job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id

    • Queued Jobs: To find all queued jobs you need to collect all redis entries with the keys named queue:#{@queue} where @queue is the name of the queue your worker class is using. Modify accordingly by looping through multiple queues if you are using multiple queues for a particular worker class. Resque.redis.lrange("queue:#{@queue}",0,-1)
    • Failed Jobs: To find all queued jobs you need to collect all redis entries with the keys named failed (unless you are using multiple failure queues or some other than default setup). Resque.redis.lrange("failed",0,-1)
    • Working Jobs: To find all working jobs you can use Resque.workers which contains an array of all workers and the jobs that are running. Resque.workers.map(&:job)
    • Job: Each job in each of the above lists will be an encoded hash. You can decode the job into a ruby hash using Resque.decode(job).
    • Class and args: For queued jobs, the class and args keys are job["class"] and job["args"]. For failed and working jobs these are job["payload"]["class"] and job["payload"]["args"].
    • Queue: For each of the failed and working jobs found, the queue will be job["queue"]. Before testing the args list for the object id, you only want jobs that match the class and queue. Your queued jobs list will already be limited to the queue you collected.

    Below are the example worker class and model methods to find (and to remove) jobs that are associated to the example model object (report).

    Report processing worker class for resque / redis backed jobs

    class ProcessReportWorker
      # queue name
      @queue = :report_processing
      # tell the worker class where the report id is in the arguments list
      REPORT_ID_ARGS_INDEX = 0 
    
      # <snip> rest of class, not needed here for this answer
    
      # find jobs methods - find by report id (report is the 'associated' object)
    
      def self.find_queued_jobs_by_report_id report_id
        queued_jobs(@queue).select do |job|
          is_job_for_report? :queued, job, report_id
        end
      end
    
      def self.find_failed_jobs_by_report_id report_id
        failed_jobs.select do |job|
          is_job_for_report? :failed, job, report_id
        end
      end
    
      def self.find_working_jobs_by_report_id report_id
        working_jobs.select do |worker,job|
          is_job_for_report? :working, job, report_id
        end
      end
    
      # association test method - determine if this job is associated      
    
      def self.is_job_for_report? state, job, report_id
        attributes = job_attributes(state, job)
        attributes[:klass] == self.name && 
          attributes[:queue] == @queue && 
            attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id
      end
    
      # remove jobs methods
    
      def self.remove_failed_jobs_by_report_id report_id
        find_failed_jobs_by_report_id(report_id).each do |job|
          Resque::Failure.remove(job["index"]) 
        end
      end
    
      def self.remove_queued_jobs_by_report_id report_id
        find_queued_jobs_by_report_id(report_id).each do |job|
          Resque::Job.destroy(@queue,job["class"],*job["args"])
        end
      end
    
      # reusable methods - these methods could go elsewhere and be reusable across worker classes
    
      # job attributes method
    
      def self.job_attributes(state, job)
        if state == :queued && job["args"].present?
          args = job["args"]
          klass = job["class"]
        elsif job["payload"] && job["payload"]["args"].present?
          args = job["payload"]["args"]
          klass = job["payload"]["class"] 
        else
          return {args: nil, klass: nil, queue: nil}
        end
        {args: args, klass: klass, queue: job["queue"]}
      end
    
      # jobs list methods
    
      def self.queued_jobs queue
        Resque.redis.lrange("queue:#{queue}", 0, -1)
          .collect do |job| 
            job = Resque.decode(job)
            job["queue"] = queue # for consistency only
            job
          end
      end
    
      def self.failed_jobs
        Resque.redis.lrange("failed", 0, -1)
          .each_with_index.collect do |job,index|
            job = Resque.decode(job)
            job["index"] = index # required if removing
            job
          end
      end
    
      def self.working_jobs
        Resque.workers.zip(Resque.workers.map(&:job))
          .reject { |w, j| w.idle? || j['queue'].nil? }
      end
    
    end
    

    So then the usage example for Report model becomes

    class Report < ActiveRecord::Base
      before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs
    
      def check_if_working_jobs
        # find all working jobs related to this report object
        working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
        return false unless working_jobs.empty?
      end
    
      def remove_queued_and_failed_jobs
        # find all jobs related to this report object
        queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
        failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 
    
        # extra code and conditionals here for example only as all that is really 
        # needed is to call the remove methods without first finding or checking
    
        unless queued_jobs.empty?
          ProcessReportWorker.remove_queued_jobs_by_report_id(self.id)
        end
    
        unless failed_jobs.empty?
          ProcessReportWorker.remove_failed_jobs_by_report_id(self.id)
        end
    
      end
    end
    

    The solution needs to be modified if you use multiple queues for the worker class or if you have multiple failure queues. Also, redis failure backend was used. If a different failure backend is used, changes may be required.