Search code examples
rubyrecursionhydra

limiting asynchronous process execution


I have a program that uses hydra to request pages. I want to limit the number of items in the download queue. The hydra.queue call is non-blocking, so I need somehow to trigger a new download when the old one finishes processing. The current solution uses recursion.

def perform
  @hydra = Typhoeus::Hydra.new(:max_concurrency => 100)
  @target_concurrency.times {process_next_product}
end

def process_next_product
  if id = $redis.lpop(REDIS_UNSCRAPED_PRODUCTS_KEY)
    product = Product.find(id).url
    req = Typhoeus::Request.new(product.url, {:follow_location => true})

    req.on_complete do |resp|
      process_product_response(product, resp.body)
    end
    @hydra.queue(req)
  end
end

def process_product_response(product, response)
  # process product
  process_next_product
end

This solution is eating memory as the call stack grows. I want a solution that allows me to keep x items in the hydra queue without using recursion.


Solution

  • The solution that I went with was to use signals to trigger processing the next entry:

    def perform
      @hydra = Typhoeus::Hydra.new(:max_concurrency => 100)
      set_trap
      @target_concurrency.times { Process.kill("USR1", Process.pid) }
    end
    
    def set_trap
      Signal.trap('USR1') do
        if id = $redis.lpop(REDIS_UNSCRAPED_PRODUCTS_KEY)
          product = Product.find(id).url
          req = Typhoeus::Request.new(product.url, {:follow_location => true})
    
          req.on_complete do |resp|
            process_product_response(product, resp.body)
          end
          @hydra.queue(req)
        end
      end
    end
    
    def process_product_response(product, response)
      # process product
      Process.kill("USR1", Process.pid)
    end