Search code examples
rubyasync-awaitreactortcpsocket

ruby 'async/io' and Reactor, help understand the example


I need some help in understanding basic 'async/io' example as given here:

require 'async/io'

def echo_server(endpoint)
    Async::Reactor.run do |task|
        # This is a synchronous block within the current task:
        endpoint.accept do |client|
            # This is an asynchronous block within the current reactor:
            data = client.read(512)

            # This produces out-of-order responses.
            task.sleep(rand * 0.01)

            client.write(data.reverse)
        end
    end
end

def echo_client(endpoint, data)
    Async::Reactor.run do |task|
        endpoint.connect do |peer|
            result = peer.write(data)

            message = peer.read(512)

            puts "Sent #{data}, got response: #{message}"
        end
    end
end

Async::Reactor.run do
    endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)

    server = echo_server(endpoint)

    5.times.collect do |i|
        echo_client(endpoint, "Hello World #{i}")
    end.each(&:wait)

    server.stop
end

A reactor pattern (correct please if wrong) is basically kind of scheduler of synchronous tasks, such that upon blocking, a task is suspended and another one is launched, and so on, and in turn tasks are resumed once their operation is unblocked [source]

In the given github example first the echo_server method returning Async::Task is defined, and assigned to server variable server

Now that the variable is created, the underlying task starts listening on the socket and gets blocked by client.read(512) call. It is suspended and the flow reaches the loop part where 5 client Async::Tasks one by one write messages to the socket.

And now happens something I don't understand. The server task gets unlocked and replies to the first message. After that it should quit, because there is no kind of loop. However it serves all five requests and quits after that. Obviously there is something I am getting wrong, but I cannot figure it out. Any comments are highly appreciated.


Solution

  • echo_client is executed 5 times as it is invoked from a loop. That function calls endpoint.connect and sends a single message and reads a single response.

    echo_server is executed 1 time and calls endpoint.accept which yields the block for each connection. The server reads one message and writes it back.

    The server task gets unlocked and replies to the first message. After that it should quit, because there is no kind of loop.

    endpoint.accept is implemented as a loop:

            def accept(backlog = Socket::SOMAXCONN, &block)
                bind do |server|
                    server.listen(backlog)
    
                    server.accept_each(&block)
                end
            end
    

    Here is the implementation of server.accept_each:

            def accept_each(task: Task.current)
                task.annotate "accepting connections #{self.local_address.inspect}"
    
                while true
                    self.accept(task: task) do |io, address|
                        yield io, address, task: task
                    end
                end
            end
    

    As you can see, it binds to the socket, listens for incoming connections, and then invokes accept in a loop.