Search code examples
rubymultithreadingiopipenamed-pipes

repeatedly read Ruby IO until X bytes have been read, Y seconds have elapsed, or EOF, whichever comes first


I want to forward logs from an IO pipe to an API. Ideally, there would be no more than e.g. 10 seconds of latency (so humans watching the log don't get impatient).

A naive way to accomplish this would be to use IO.each_byte and send each byte to the API as soon as it becomes available, but the overhead of processing a request per byte causes additional latency.

IO#each(limit) also gets close to what I want, but if the limit is 50 kB and after 10 seconds, only 20 kB has been read, I want to go ahead and send that 20 kB without waiting for more. How can I apply both a time and size limit simultaneously?


Solution

  • Here's what I ended up with. Simpler solutions still appreciated!

    def read_chunks(io, byte_interval: 200 * 1024, time_interval: 5)
      buffer = last = nil
      reset = lambda do
        buffer = ''
        last = Time.now
      end
      reset.call
      mutex = Mutex.new
      cv = ConditionVariable.new
      [
        lambda do
          IO.select [io]
          mutex.synchronize do
            begin
              chunk = io.readpartial byte_interval
              buffer.concat chunk
            rescue EOFError
              raise StopIteration
            ensure
              cv.signal
            end
          end
        end,
        lambda do
          mutex.synchronize do
            until io.eof? || Time.now > (last + time_interval) || buffer.length > byte_interval
              cv.wait mutex, time_interval
            end
            unless buffer.empty?
              buffer_io = StringIO.new buffer
              yield buffer_io.read byte_interval until buffer_io.eof?
              reset.call
            end
            raise StopIteration if io.eof?
          end
        end,
      ].map do |function|
        Thread.new { loop { function.call } }
      end.each(&:join)
    end