Search code examples
ruby-on-railsrubymessage-queuesidekiq

Sidekiq: Ensure all jobs on the queue are unique


I have some update triggers which push jobs onto the Sidekiq queue. So in some cases, there can be multiple jobs to process the same object.

There are a couple of uniqueness plugins ("Middleware", Unique Jobs), they're not documented much, but they seem to be more like throttlers to prevent repeat processing; what I want is a throttler that prevents repeat creating of the same jobs. That way, an object will always be processed in its freshest state. Is there a plugin or technique for this?


Update: I didn't have time to make a middleware, but I ended up with a related cleanup function to ensure queues are unique: https://gist.github.com/mahemoff/bf419c568c525f0af903


Solution

  • My suggestion is to search for prior scheduled jobs based on some select criteria and delete, before scheduling a new one. This has been useful for me when i want a single scheduled job for a particular Object, and/or one of its methods.

    Some example methods in this context:

     find_jobs_for_object_by_method(klass, method)
    
      jobs = Sidekiq::ScheduledSet.new
    
      jobs.select { |job|
        job.klass == 'Sidekiq::Extensions::DelayedClass' &&
            ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
            job_klass == klass &&
            job_method == method
      }
    
    end
    
    ##
    # delete job(s) specific to a particular class,method,particular record
    # will only remove djs on an object for that method
    #
    def self.delete_jobs_for_object_by_method(klass, method, id)
    
      jobs = Sidekiq::ScheduledSet.new
      jobs.select do |job|
        job.klass == 'Sidekiq::Extensions::DelayedClass' &&
            ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
            job_klass == klass &&
            job_method == method  &&
            args[0] == id
      end.map(&:delete)
    
    end
    
    ##
    # delete job(s) specific to a particular class and particular record
    # will remove any djs on that Object
    #
    def self.delete_jobs_for_object(klass, id)
    
      jobs = Sidekiq::ScheduledSet.new
      jobs.select do |job|
        job.klass == 'Sidekiq::Extensions::DelayedClass' &&
            ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
            job_klass == klass &&
            args[0] == id
      end.map(&:delete)
    
    end