Search code examples
rubyasynchronousconcurrent-ruby

Concurrent-ruby async method calling problem


Ruby version: ruby-3.2.1

I have a requirement of reading a csv file which contains 50k rows. Then with each row i need to execute 7 APIs one by one in the order to create an entry in a third part application.

I am trying to use async feature of concurrent-ruby gem. Below is a sample code of the same.

require 'csv'
require "json"
require 'byebug'
require 'concurrent'

class Processor
  include Concurrent::Async

  def consume(c_row)
    ## Inside consume methods 7 APIS are called in sequence for each row
    puts "processing!!!! -> #{c_row}"
    sleep 1 if c_row % 2
    sleep 5 if c_row % 3 
  end
end


class ImporterAsync
  def self.perform(master_file)
    begin
      ## CSV has 50k entries ##
      CSV.foreach(master_file, headers: true).with_index(1) do |row|
        Processor.new.async.consume(row)
      end
    rescue StandardError => e
      puts "Main script failed: #{e.message}"
    ensure
      puts 'closing'
    end
  end
end

ImporterAsync.perform('master_file.csv')

I am getting a no response from the code when I invoke the processor with async keyword Processor.new.async.consume(row)

But code executes fine without async keyword. Processor.new.consume(row).

I believe with asynchronous way will help my code to execute better with this load. I am also open for suggestions.

Thanks in advance - Ajith


Solution

  • Your program effectively exits immediately after creating the async threads. You need to wait for them or they'll terminate with the rest of the program. Using async like this actually returns a Concurrent::IVar that you can use to track the work in each one on #value.

    class ImporterAsync
      def self.perform(master_file)
        futures = []
        begin
          CSV.foreach(master_file, headers: true).with_index(1) do |row|
            future = Processor.new.async.consume(row)
            futures.push(future)
          end
        rescue StandardError => e
          puts "Main script failed: #{e.message}"
        ensure
          puts 'closing'
        end
    
        futures.each do |t|
          t.wait
          puts "finished (#{t.value.to_s[0...40].strip}) #{Time.now}"
        end
        puts "ALL DONE"
      end
    end
    

    This is an altered version of your class to collect each IVar up into an array. After starting them all it waits for each one to finish. This means the program will wait for ALL of them before exiting. (Note that finished can also mean an exception was raised, it doesn't imply a "good" finish.)