I have a rails app that fetches a lot of emails from multiple IMAP accounts.
2 issues though:
My code:
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
if s.lock
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
end
end
Redis
subclass and overload blpop
to accept -1
for non-blocking use of lpop
.redis-semaphore calls @redis.blpop
in Redis::Semaphore#lock
. While you could overload the lock
method to use @redis.lpop
instead, a much simpler approach would be to pass a custom instance of Redis
to the semaphore.
Place the following in the lib
of your rails app and require it in your config/initializers/sidekiq.rb
(or do whatever your preference might be for loading the following class).
class NonBlockingRedis < Redis
def blpop(key, timeout)
if timeout == -1
result = lpop(key)
return result if result.nil?
return [key, result]
else
super(key, timeout)
end
end
end
Whenever you call Redis::Semaphore.new
, pass a :redis
key with a new instance of the NonBlockingRedis
class.
Call s.lock
with -1
as an argument to use lpop
instead of blpop
.
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
s.unlock
end
sidekiq_options retry: false
in your worker class should work, see below for an example.In your question, you didn't specify which worker you were having problems with jobs ending up in the retry queue. Since FetchMailsJobs
ends up enqueing ImapJob
jobs, an exception in the former may cause it to appear that the ImapJob
is being re-queued.
With your semaphore lock, it would also be a good idea to wrap your work in a begin rescue ensure
block.
class FetchMailsJobs
include Sidekiq::Worker
include Sidetiq::Schedulable
sidekiq_options retry: false
tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }
def perform(last_occurrence, current_occurrence)
users = User.all
users.each do |user|
if user.imap_accounts.exists?
ImapJob.perform_async(user._id.to_s)
end
end
end
end
class ImapJob
include Sidekiq::Worker
sidekiq_options retry: false
def perform(user_id)
s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock - 1
begin
user = User.where(_id: user_id).first
emails = ImapMails.receive_mails(user)
rescue => e
# ignore; do nothing
ensure
s.unlock
end
end
end
end
See sidekiq Advanced Options: workers for more information.