Search code examples
ruby-on-railsmongodbherokusidekiqidempotent

Sidekiq handling re-queue when processing large data


See the updated question below.

Original question:

In my current Rails project, I need to parse large xml/csv data file and save it into mongodb. Right now I use this steps:

  1. Receive uploaded file from user, store the data into mongodb
  2. Use sidekiq to perform async process of the data in mongodb.
  3. After process finished, delete the raw data.

For small and medium data in localhost, the steps above run well. But in heroku, I use hirefire to dynamically scale worker dyno up and down. When worker still processing the large data, hirefire see empty queue and scale down worker dyno. This send kill signal to the process, and leave the process in incomplete state.

I'm searching a better way to do the parsing, allow the parsing process got killed anytime (saving the current state when receiving kill signal), and allow the process got re-queued.

Right now I'm using Model.delay.parse_file and it don't get re-queued.

UPDATE

After reading sidekiq wiki, I found article about job control. Can anyone explain the code, how it works, and how it preserve it's state when receiving SIGTERM signal and the worker get re-queued?

Is there any alternative way to handle job termination, save current state, and continue right from the last position?

Thanks,


Solution

  • Might be easier to explain the process and the high level steps, give a sample implementation (a stripped down version of one that I use), and then talk about throw and catch:

    1. Insert the raw csv rows with an incrementing index (to be able to resume from a specific row/index later)
    2. Process the CSV stopping every 'chunk' to check if the job is done by checking if Sidekiq::Fetcher.done? returns true
    3. When the fetcher is done?, store the index of the currently processed item on the user and return so that the job completes and control is returned to sidekiq.
    4. Note that if a job is still running after a short timeout (default 20s) the job will be killed.
    5. Then when the job runs again simply, start where you left off last time (or at 0)

    Example:

        class UserCSVImportWorker
          include Sidekiq::Worker
    
          def perform(user_id)
            user = User.find(user_id)
    
            items = user.raw_csv_items.where(:index => {'$gte' => user.last_csv_index.to_i})
            items.each_with_index do |item, i|
              if (i+1 % 100) == 0 && Sidekiq::Fetcher.done?
                user.update(last_csv_index: item.index)
    
                return
              end
    
              # Process the item as normal
            end
          end
        end
    

    The above class makes sure that each 100 items we check that the fetcher is not done (a proxy for if shutdown has been started), and ends execution of the job. Before the execution ends however we update the user with the last index that has been processed so that we can start where we left off next time.

    throw catch is a way to implement this above functionality a little cleaner (maybe) but is a little like using Fibers, nice concept but hard to wrap your head around. Technically throw catch is more like goto than most people are generally comfortable with.

    edit

    Also you could not make call to Sidekiq::Fetcher.done? and record the last_csv_index on each row or on each chunk of rows processed, that way if your worker is killed without having the opportunity to record the last_csv_index you can still resume 'close' to where you left off.