Search code examples
rubyredismutexsemaphoresidekiq

How best to keep a job queue clean of retry/duplicate jobs (using sidekiq and redis-semaphore)


I have a rails app that fetches a lot of emails from multiple IMAP accounts.

  • I use sidekiq to handle the jobs.
  • I use sidetiq to schedule the jobs.
  • I use redis-semaphore to ensure that recurring jobs for the same user don't stumble upon each other.

2 issues though:

  • 1: When a job hits "if s.lock" redis-semaphore puts it on hold until all previous jobs have finished. I need the job to be cancelled instead of being queued.
  • 2: If during a job an exception is raised, resulting in a crash, sidekiq will put the job back into the queue for a retry. I need the job to be cancelled instead of being queued. Putting "sidekiq_options :retry => false" into the code does not seem to make a difference.

My code:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end

Solution

  • 1. Create a Redis subclass and overload blpop to accept -1 for non-blocking use of lpop.

    redis-semaphore calls @redis.blpop in Redis::Semaphore#lock. While you could overload the lock method to use @redis.lpop instead, a much simpler approach would be to pass a custom instance of Redis to the semaphore.

    Place the following in the lib of your rails app and require it in your config/initializers/sidekiq.rb (or do whatever your preference might be for loading the following class).

    class NonBlockingRedis < Redis
      def blpop(key, timeout)
        if timeout == -1
          result = lpop(key)
          return result if result.nil?
          return [key, result]
        else
          super(key, timeout)
        end
      end
    end
    

    Whenever you call Redis::Semaphore.new, pass a :redis key with a new instance of the NonBlockingRedis class.

    Call s.lock with -1 as an argument to use lpop instead of blpop.

    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock -1
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
    

    2. Using sidekiq_options retry: false in your worker class should work, see below for an example.

    In your question, you didn't specify which worker you were having problems with jobs ending up in the retry queue. Since FetchMailsJobs ends up enqueing ImapJob jobs, an exception in the former may cause it to appear that the ImapJob is being re-queued.

    With your semaphore lock, it would also be a good idea to wrap your work in a begin rescue ensure block.

    class FetchMailsJobs
      include Sidekiq::Worker
      include Sidetiq::Schedulable
    
      sidekiq_options retry: false
    
      tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
    
      def perform(last_occurrence, current_occurrence)
        users = User.all
        users.each do |user|
    
          if user.imap_accounts.exists?
            ImapJob.perform_async(user._id.to_s)
          end
        end
      end
    end
    
    class ImapJob
      include Sidekiq::Worker
    
      sidekiq_options retry: false
    
      def perform(user_id)
        s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
        if s.lock - 1
          begin
            user = User.where(_id: user_id).first
            emails = ImapMails.receive_mails(user)
          rescue => e
            # ignore; do nothing
          ensure
            s.unlock
          end
        end
      end
    end
    

    See sidekiq Advanced Options: workers for more information.