Search code examples
rubyruby-kafka

Wait until value appears in the hash


Recently, I was given a task to build a REST API request that is responsible for sending messages to Kafka's inbound channel and then waiting for output from an outbound one. All went well until I encountered the issue related to waiting for this particular message.

It's worth pointing out that, after successful arrival, messages are written to the global message holders, which is just a ruby hash under the hood. Below is the function that monitors hash, until the latter is filled with some value.

def monitor_payment_hash(key)
 while @uuid.payment_create.get_message(key).nil?
   next
 end
 @uuid.payment_create.get_message(key)
end

Is it even appropriate to implement it that way? What should I attempt at this point?
NOTICE: Kafka consumer runs in a separate thread.

Update

I've just headed over to ruby docs and stumbled upon some interesting sections on channels. As far as I'm aware, channels are the best choice for communication between rubytines (just a fancy name for goroutines, but in ruby ecosystem :) )

Solution

  • I think you need timeout and a way to force stop polling process, moreover, you maybe need an abstract to improve in future.

    class Poller
      def self.poll(key:, from_source:, options: {})
        start_time = Time.now
        catch(:stop_polling) do
          loop do
            message = from_source.get_message(key)
            if message.nil?
              wait_time = Time.now - start_time
              throw :stop_polling if wait_time > options[:timeout]
            else
              yield(message) if block_given?
              throw :stop_polling
            end
          end
        end
      end
    end
    
    def monitor_payment_hash(key)
      Poller.poll key: key, from_source: @uuid.payment_create, options: {timeout: 60} do |message|
        # write to the global message holders
        # or handle message by block
        yield(message) if block_given?
      end
    end
    

    You maybe need to add more logic such as retry if timeout, polling a list of keys, log... I recommend you learn how to build a long polling from this source : https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb