Ruby version: ruby-3.2.1
I have a requirement of reading a csv file which contains 50k rows. Then with each row i need to execute 7 APIs one by one in the order to create an entry in a third part application.
I am trying to use async feature of concurrent-ruby gem. Below is a sample code of the same.
require 'csv'
require "json"
require 'byebug'
require 'concurrent'
class Processor
include Concurrent::Async
def consume(c_row)
## Inside consume methods 7 APIS are called in sequence for each row
puts "processing!!!! -> #{c_row}"
sleep 1 if c_row % 2
sleep 5 if c_row % 3
end
end
class ImporterAsync
def self.perform(master_file)
begin
## CSV has 50k entries ##
CSV.foreach(master_file, headers: true).with_index(1) do |row|
Processor.new.async.consume(row)
end
rescue StandardError => e
puts "Main script failed: #{e.message}"
ensure
puts 'closing'
end
end
end
ImporterAsync.perform('master_file.csv')
I am getting a no response from the code when I invoke the processor with async keyword Processor.new.async.consume(row)
But code executes fine without async keyword.
Processor.new.consume(row)
.
I believe with asynchronous way will help my code to execute better with this load. I am also open for suggestions.
Thanks in advance - Ajith
Your program effectively exits immediately after creating the async threads. You need to wait for them or they'll terminate with the rest of the program.
Using async like this actually returns a Concurrent::IVar that you can use to track the work in each one on #value
.
class ImporterAsync
def self.perform(master_file)
futures = []
begin
CSV.foreach(master_file, headers: true).with_index(1) do |row|
future = Processor.new.async.consume(row)
futures.push(future)
end
rescue StandardError => e
puts "Main script failed: #{e.message}"
ensure
puts 'closing'
end
futures.each do |t|
t.wait
puts "finished (#{t.value.to_s[0...40].strip}) #{Time.now}"
end
puts "ALL DONE"
end
end
This is an altered version of your class to collect each IVar up into an array. After starting them all it waits for each one to finish. This means the program will wait for ALL of them before exiting. (Note that finished can also mean an exception was raised, it doesn't imply a "good" finish.)