Search code examples
rubyrackeventmachinethin

How does thin asynch app example buffer response to the body?


Hi I have been going through the documentation on Thin and I am reasonably new to eventmachine but I am aware of how Deferrables work. My goal is to understand how Thin works when the body is deferred and streamed part by part.

The following is the example that I'm working with and trying to get my head around.


class DeferrableBody
  include EventMachine::Deferrable

  def call(body)
    body.each do |chunk|
      @body_callback.call(chunk)
    end

   # @body_callback.call()
  end

  def each &blk
    @body_callback = blk
  end

end

class AsyncApp

  # This is a template async response. N.B. Can't use string for body on 1.9
  AsyncResponse = [-1, {}, []].freeze
  puts "Aysnc testing #{AsyncResponse.inspect}"

  def call(env)

    body = DeferrableBody.new    

    # Get the headers out there asap, let the client know we're alive...
    EventMachine::next_tick do
       puts "Next tick running....."
       env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, body] 
     end

    # Semi-emulate a long db request, instead of a timer, in reality we'd be 
    # waiting for the response data. Whilst this happens, other connections 
    # can be serviced.
    # This could be any callback based thing though, a deferrable waiting on 
    # IO data, a db request, an http request, an smtp send, whatever.
    EventMachine::add_timer(2) do
      puts "Timer started.."
      body.call ["Woah, async!\n"]

      EventMachine::add_timer(5) {
        # This could actually happen any time, you could spawn off to new 
        # threads, pause as a good looking lady walks by, whatever.
        # Just shows off how we can defer chunks of data in the body, you can
        # even call this many times.
        body.call ["Cheers then!"]
        puts "Succeed Called."
        body.succeed
      }
    end

    # throw :async # Still works for supporting non-async frameworks...
    puts "Async REsponse sent."
    AsyncResponse # May end up in Rack :-)
  end

end

# The additions to env for async.connection and async.callback absolutely 
# destroy the speed of the request if Lint is doing it's checks on env.
# It is also important to note that an async response will not pass through 
# any further middleware, as the async response notification has been passed 
# right up to the webserver, and the callback goes directly there too.
# Middleware could possibly catch :async, and also provide a different 
# async.connection and async.callback.

# use Rack::Lint
run AsyncApp.new

The part which I don't clearly understand is what happens within the DeferrableBody class in the call and the each methods.

I get that the each receives chunks of data once the timer fires as blocks stored in @body_callback and when succeed is called on the body it sends the body but when is yield or call called on those blocks how does it become a single message when sent.

I feel I don't understand closures enough to understand whats happening. Would appreciate any help on this.

Thank you.


Solution

  • Ok I think I figured out how the each blocks works.

    Thin on post_init seems to be generating a @request and @response object when the connection comes in. The response object needs to respond to an each method. This is the method we override.

    The env['async.callback'] is a closure that is assigned to a method called post_process in the connection.rb class method where the data is actually sent to connection which looks like this

    
          @response.each do |chunk|        
            trace { chunk }
            puts "-- [THIN] sending data #{chunk} ---"
            send_data chunk
          end
    
    

    How the response object's each is defined

    
     def each
          yield head
    
          if @body.is_a?(String)
            yield @body
          else
    
            @body.each { |chunk| yield chunk }
          end
        end
    
    

    So our env['async.callback'] is basically a method called post_process defined in the connection.rb class accessed via method(:post_process) allowing our method to be handled like a closure, which contains access to the @response object. When the reactor starts it first sends the header data in the next_tick where it yields the head, but the body is empty at this point so nothing gets yielded.

    After this our each method overrides the old implementation owned by the @response object so when the add_timers fire the post_process which gets triggered sends the data that we supply using the body.call(["Wooah..."]) to the browser (or wherever)

    Completely in awe of macournoyer and the team committing to thin. Please correct my understanding if you feel this is not how it works.