Recently, I was given a task to build a REST API request that is responsible for sending messages to Kafka's inbound channel and then waiting for output from an outbound one. All went well until I encountered the issue related to waiting for this particular message.
It's worth pointing out that, after successful arrival, messages are written to the global message holders, which is just a ruby hash under the hood. Below is the function that monitors hash, until the latter is filled with some value.
def monitor_payment_hash(key)
while @uuid.payment_create.get_message(key).nil?
next
end
@uuid.payment_create.get_message(key)
end
Is it even appropriate to implement it that way? What should I attempt at this point?
NOTICE: Kafka consumer runs in a separate thread.
I think you need timeout
and a way to force stop polling process, moreover, you maybe need an abstract to improve in future.
class Poller
def self.poll(key:, from_source:, options: {})
start_time = Time.now
catch(:stop_polling) do
loop do
message = from_source.get_message(key)
if message.nil?
wait_time = Time.now - start_time
throw :stop_polling if wait_time > options[:timeout]
else
yield(message) if block_given?
throw :stop_polling
end
end
end
end
end
def monitor_payment_hash(key)
Poller.poll key: key, from_source: @uuid.payment_create, options: {timeout: 60} do |message|
# write to the global message holders
# or handle message by block
yield(message) if block_given?
end
end
You maybe need to add more logic such as retry if timeout, polling a list of keys, log... I recommend you learn how to build a long polling from this source : https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb