I need some help in understanding basic 'async/io' example as given here:
require 'async/io'
def echo_server(endpoint)
Async::Reactor.run do |task|
# This is a synchronous block within the current task:
endpoint.accept do |client|
# This is an asynchronous block within the current reactor:
data = client.read(512)
# This produces out-of-order responses.
task.sleep(rand * 0.01)
client.write(data.reverse)
end
end
end
def echo_client(endpoint, data)
Async::Reactor.run do |task|
endpoint.connect do |peer|
result = peer.write(data)
message = peer.read(512)
puts "Sent #{data}, got response: #{message}"
end
end
end
Async::Reactor.run do
endpoint = Async::IO::Endpoint.tcp('0.0.0.0', 9000)
server = echo_server(endpoint)
5.times.collect do |i|
echo_client(endpoint, "Hello World #{i}")
end.each(&:wait)
server.stop
end
A reactor pattern (correct please if wrong) is basically kind of scheduler of synchronous tasks, such that upon blocking, a task is suspended and another one is launched, and so on, and in turn tasks are resumed once their operation is unblocked [source]
In the given github example first the echo_server method returning Async::Task
is defined, and assigned to server variable server
Now that the variable is created, the underlying task starts listening on the socket and gets blocked by client.read(512)
call. It is suspended and the flow reaches the loop part where 5 client Async::Task
s one by one write messages to the socket.
And now happens something I don't understand. The server task gets unlocked and replies to the first message. After that it should quit, because there is no kind of loop. However it serves all five requests and quits after that. Obviously there is something I am getting wrong, but I cannot figure it out. Any comments are highly appreciated.
echo_client
is executed 5 times as it is invoked from a loop. That function calls endpoint.connect
and sends a single message and reads a single response.
echo_server
is executed 1 time and calls endpoint.accept
which yields the block for each connection. The server reads one message and writes it back.
The server task gets unlocked and replies to the first message. After that it should quit, because there is no kind of loop.
endpoint.accept
is implemented as a loop:
def accept(backlog = Socket::SOMAXCONN, &block)
bind do |server|
server.listen(backlog)
server.accept_each(&block)
end
end
Here is the implementation of server.accept_each
:
def accept_each(task: Task.current)
task.annotate "accepting connections #{self.local_address.inspect}"
while true
self.accept(task: task) do |io, address|
yield io, address, task: task
end
end
end
As you can see, it binds to the socket, listens for incoming connections, and then invokes accept in a loop.