I am struggling to deduplicate Sidekiq jobs and limit concurrency in the same worker without introducing a race condition that causes the worker to miss updates. Below are the solutions I have tried and the reason why they do not satisfy me. Can you suggest an improvement to my solutions or another way to solve this problem?
unique_until: :start
with Sidekiq::Limiter.concurrent
Currently, the worker uses unique_until: :start
and a Sidekiq::Limiter.concurrent
lock.
The downside of this solution is that The interaction between these two Sidekiq features causes many duplicate jobs in the queue. Here is a sequence of events that cause duplicate jobs:
Worker.perform_async(1)
enqueues job A1Worker.perform_async(2)
enqueues job B1Worker
with argument 2.Worker.perform_async(2)
enqueues job B2. I would like this to be a no-op, but it enqueues another job because we released the unique lock in step 4.Worker.perform_async(2)
enqueues job B3... and so on.unique_until: :success
with Sidekiq::Limiter.concurrent
I can fix the duplicate jobs issue if I switch to unique_until: :success
(the default behavior if unique_until
is not specified).
The downside of this solution is that it opens up a race condition where the worker misses updates that happen while a job is running.
I can fix the duplicate jobs issue and avoid the race condition if I stop using Sidekiq::Limiter.concurrent
and instead use a queue that is handled by a Sidekiq process that only has one thread.
The downside of this solution is that I have to use sadly resource-constrained hardware, so adding a second Sidekiq process is impactful.
This shows how I am using the uniqueness & limiter features:
class ExpensiveWorker
include Sidekiq::Worker
sidekiq_options unique_for: 30.minutes, unique_until: :start
EXPENSIVE_JOB_LIMITER = Sidekiq::Limiter.concurrent('expensive_job',
1,
wait_timeout: 5.seconds,
lock_timeout: 15.minutes)
def perform(id)
EXPENSIVE_JOB_LIMITER.within_limit do
Rails.logger.info "Processing #{id}..."
sleep 10
end
end
end
For the sake of simplicity, I am going to describe the data we are working with as author models that each have many books. We have RebuildAuthorImagesWorker
and ClassifyAuthorGenreWorker
that both take an author ID as their only argument.
Both of these workers perform CPU & RAM-intensive calculations on an author and on the author's books. We use Sidekiq::Limiter.concurrent
to ensure that only one of these workers has an active job at any given time. We do this to avoid impacting our puny servers. (We also have many other workers who do not need to be limited in this way.)
It is common for many updates to happen on the same author or on that author's books during a short period of time, because of multiple active users or because one user updated multiple books by the same author. We use unique_for: :start
to prevent RebuildAuthorImagesWorker
from being enqueued multiple times for the same author. Ditto for ClassifyAuthorGenreWorker
. We want to avoid the duplicate jobs because of the system overhead associated with running them. The jobs are idempotent, so duplicate jobs do not cause data issues. (It is OK and normal for one job of each worker to be enqueued for the same author.)
If RebuildAuthorImagesWorker
is actively running on author A, and then user X makes an update to author A before the RebuildAuthorImagesWorker
job finishes, then we do want to enqueue a second RebuildAuthorImagesWorker
job for author A, so we do not miss incorporating data from the user X's update in the image. That is why we use unique_until: :start
.
One idea:
When the user wants to change Author A, I would enqueue a scheduled, unique UpdateAuthorJob
for Author A which updates their info 10 minutes from now. That way, the user can make lots of changes to the author and the system will wait for that 10 minute cooldown period before performing the actual update work, ensuring that you get all the updates as one group.