Search code examples
ruby-on-railsrubymultithreadinggoogle-ads-apisidekiq

Synchronise a method different Sidekiq threads and wait


Problem: I have several sidekiq threads and a function that may only be called once at the time from any of the threads.

Reason: We are querying the AdWords API to get some data. They are quite restrictive when it comes to rate limits. Only one of the threads may call the function to get data at a time.

Now some code:

# Public: Get estimates for a set of keywords. If there is an error, retry
# several times. If not successful, raise an error
#
# keywords: The keyword objects to get estimates for.
# save: Boolean to indicate whether the keyword objects should be saved to
# the database
#
def repeatedly_try_get_estimates(keywords: [], save: true, sleep_delay: 150)
  return keywords if keywords.empty?
  func = -> { get_estimates(keywords, !save) }
  retry_operation(function: func, max_tries: 15, sleep_delay: sleep_delay)
end
  • As you can see, right now I have a huge sleep_delay work around the problem.
  • The code calls the retry_operation function with the get_estimates function as parameter. It will then retry the get_estimates function several times until there is an API exception.

The retry_function:

# Private: Retry a function X times and wait X seconds. If it does not work X times,
# raise an error. If successful return the functions results.
#
# - max_tries: The maximum tries to repeat the function
# - sleep_delay: The seconds to wait between each iteration.
# - function: The lambda function to call each iteration
#
def retry_operation(max_tries: 5, sleep_delay: 30, function: nil, current_try: 0, result: nil)

  # Can't call, no function
  if function.nil?
    return
  end

  # Abort, tried too frequently.
  if current_try > max_tries
    raise "Failed function too often"
  end

  # Check if there is an exception
  exception = true
  begin
    result = function.call
    exception = false
  rescue => e
    Rails.logger.info "Received error when repeatedly calling function #{e.message.to_s}"
  end

  if exception
    sleep sleep_delay if sleep_delay > 0
    retry_operation(max_tries: max_tries, sleep_delay: sleep_delay, function: function, current_try: current_try + 1)
  else
    result
  end
end

The get_estimates_function is here: https://gist.github.com/a14868d939ef0e34ef9f. It is too long, just in case.

I guess I need to do the following:

  1. Adjust the code in the repeatedly_try_get_estimates function.
  2. Use a mutex in the class.
  3. Rescue the exception if the mutex is in use.
  4. Only if the mutex is free, run the rety_operation, else sleep some time

Thanks for your help :)


Solution

  • Here we go, got it to work:

    # Public: Get estimates for a set of keywords. If there is an error, retry
    # several times. If not successful, raise an error
    #
    # keywords: The keyword objects to get estimates for.
    # save: Boolean to indicate whether the keyword objects should be saved to
    # the database
    #
    def repeatedly_try_get_estimates(keywords: [], save: true, sleep_delay: 40)
      return keywords if keywords.empty?
      func = -> { get_estimates(keywords, save_keywords: true) }
      exception = nil
      result = nil
      initial_sleep = 0
    
      estimates_mutex.synchronize do
        since_last_request = Time.now.to_i - last_adwords_api_request
        if since_last_request <= 30
          Rails.logger.info "AdWords: Last request was only few seconds ago - sleeping #{since_last_request}."
          initial_sleep = since_last_request
        end
        begin
          result = retry_operation(function: func, max_tries: 15, sleep_delay: sleep_delay, initial_sleep: initial_sleep)
        rescue => e
          exception = e
        end
        @@last_adwords_api_request = Time.now.to_i
      end
      if exception
        raise exception
      end
      result
    end