Search code examples
rubyherokuwebsocketeventmachinefaye

WebSocket and EventMachine timeout and error recovery


Using puma, faye-websocket-ruby and eventmachine, I am trying to implement a WebSocket server that is extended to support channels using redis.rb. Each client will supply a channel using a route currently in development as: "/C#{random number}". All of this logic needs to reside in the server, as the clients will be microprocessor-based Python systems that will not support higher-level libraries.

My code was based on ruby-websockets-chat-demo, as a starting point. One major change was to configure it to support multiple channels during WebSocket "on open".

The code is working when run normally. However, often when one client drops, the server hangs until it is restarted. I am trying to resolve that issue, but have not been able to do so so far. Initially, Heroku would throw an H12 timeout. I've implemented rack-timeout. I've tried rescuing timeouts within the server, but those never fire. I've implemented an "on error" event within the server but it never fires. Most often, the server just goes away until restarted. The client should fend for itself, but I need the server to recover and continue.

config.ru:

require './app'
require './middlewares/myserver_backend'
require 'rack-timeout'
use Rack::Timeout, service_timeout: 20, wait_timeout: 30, wait_overtime: 60, service_past_wait: false
use Myserver::MyserverBackend
run Myserver::App

Rack middleware "backend":

%w(faye/websocket thread redis json erb).each { |m| require m }
module Myserver
  class MyserverBackend
    KEEPALIVE_TIME = ENV['KEEPALIVE_TIME']

    def initialize(app)
      @app = app
      @clients = []
      @uri = URI.parse(ENV["REDISCLOUD_URL"])
      @redis = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
    end

    def call(env)
      begin
        if Faye::WebSocket.websocket?(env)
          ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
          ws.on :open do |event|
            channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
            Thread.new do
              redis_sub = Redis.new(host: @uri.host, port: @uri.port, password: @uri.password)
              redis_sub.subscribe(channel) do |on|
                on.message do |message_channel, message|
                  puts "MyserverBackend>> Redis     message received on channel:#{message_channel}; Message is:#{message};"
                  @clients.each { |clients_ws, clients_channel| clients_ws.send(message) if clients_channel == message_channel }
                end
              end
            end
            @clients << [ws, channel]
            @clients.each do |clients_ws, clients_channel|
              puts "MyserverBackend>> Client:#{clients_ws.object_id}; Channel:#{clients_channel};"
            end
          end

          ws.on :message do |event|
            @clients.each do |clients_ws, clients_channel|
              if clients_ws == ws
                puts "MyserverBackend>> Websocket message received on channel:#{clients_channel}; Message is:#{event.data};"
                @redis.publish(clients_channel, sanitize(event.data))
              end
            end
          end

          ws.on :close do |event|
            # Close all channels for this client first
            # ws gives a channel which we use to identify it here, but we're closing all of those that are open
            @clients.each { |clients_ws, clients_channel| @redis.unsubscribe(clients_channel) if clients_ws == ws }
            @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
            channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
            puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
            ws = nil
          end

          ws.on :error do |event|
            puts "Error raised:#{nil}; ws:#{ws.object_id};"
            ws.close unless ws.nil?
          end

          # Return async Rack response
          ws.rack_response

        else
          @app.call(env)
        end

      rescue Rack::Timeout::RequestTimeoutError, Rack::Timeout::RequestExpiryError => exception
        puts "Exception raised:#{exception}; ws:#{ws.object_id};"
        ws.close(code=4999, reason=9999) unless ws.nil?
        # ensure is executed immediately so it doesn't help...
      end
    end

    private
    def sanitize(message)
      json = JSON.parse(message)
      json.each { |key, value| json[key] = ERB::Util.html_escape(value) }
      JSON.generate(json)
    end
  end
end

The Sinatra "frontend":

# https://github.com/heroku-examples/ruby-websockets-chat-demo
require 'rubygems'
require 'bundler'
require 'sinatra/base'
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

module Myserver
  class App < Sinatra::Base
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end
end

The test client:

# https://github.com/faye/faye-websocket-ruby/issues/52
# https://github.com/faye/faye-websocket-ruby
%w(bundler/setup faye/websocket eventmachine json).each { |m| require m }
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
class ClientWs

  def self.em_run
    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    EM.run do

      uri = 'myserver.herokuapp.com'
      #uri = 'localhost' if env == 'development'
      channel = "C#{rand(999999999999).to_s}"
      url = uri == 'localhost' ? "ws://#{uri}:3000/#{channel}" : "ws://#{uri}/#{channel}"
      @ws = Faye::WebSocket::Client.new(url)
      start = Time.now
      count ||= 0

      timer = EventMachine.add_periodic_timer(5+rand(5)) {
        count += 1
        send({'PING': channel, 'COUNT': count.to_s})
      }

      @ws.on :open do |event|
        puts "{'OPEN':#{channel}}"
        ClientWs.send({'OPEN': channel})
      end

      @ws.on :message do |event|
        @ip_address ||= Addrinfo.ip(URI.parse(event.target.url).host).ip_address
        begin
          parsed = JSON.parse event.data
        rescue => e
          puts ">>>> [Error! Failed to parse JSON]"
          puts ">>>> [#{e.message}]"
          puts ">>>> #{event.data}"
        end
        puts ">> #{@ip_address}:#{channel}:#{event.data};"
      end

      @ws.on :close do |event|
        timer.cancel 
        stop = Time.now - start
        puts "#{stop} seconds;"
        p [:close, event.code, event.reason]
        ws = nil
        ClientWs.em_run
      end
    end
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end

end
ClientWs.em_run

The Gemfile.lock:

GEM
  remote: https://rubygems.org/
  specs:
    activesupport (4.2.5.1)
      i18n (~> 0.7)
      json (~> 1.7, >= 1.7.7)
      minitest (~> 5.1)
      thread_safe (~> 0.3, >= 0.3.4)
      tzinfo (~> 1.1)
    eventmachine (1.2.0.1-x86-mingw32)
    faye-websocket (0.10.4)
      eventmachine (>= 0.12.0)
      websocket-driver (>= 0.5.1)
    i18n (0.7.0)
    json (1.8.3)
    json_pure (1.8.3)
    minitest (5.9.0)
    multi_json (1.12.1)
    oj (2.16.1)
    permessage_deflate (0.1.3)
    progressbar (0.21.0)
    puma (3.4.0)
    rack (1.6.4)
    rack-protection (1.5.3)
      rack
    rack-timeout (0.4.2)
    rake (11.2.2)
    redis (3.3.0)
    rollbar (2.11.5)
      multi_json
    sinatra (1.4.7)
      rack (~> 1.5)
      rack-protection (~> 1.4)
      tilt (>= 1.3, < 3)
    thread_safe (0.3.5)
    tilt (2.0.5)
    tzinfo (1.2.2)
      thread_safe (~> 0.1)
    websocket-driver (0.6.4)
      websocket-extensions (>= 0.1.0)
    websocket-extensions (0.1.2)

PLATFORMS
  x86-mingw32

DEPENDENCIES
  activesupport (= 4.2.5.1)
  bundler
  faye-websocket
  json_pure
  oj (~> 2.16.0)
  permessage_deflate
  progressbar
  puma
  rack
  rack-timeout
  rake
  redis (>= 3.2.0)
  rollbar
  sinatra

RUBY VERSION
   ruby 2.2.4p230

BUNDLED WITH
   1.12.5

What client sees when attempting to connect to stalled server:

ruby client.rb
20.098119 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.07921 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]
20.075731 seconds;
[:close, 1002, "Error during WebSocket handshake: Unexpected response code: 500"]   

config/puma.rb:

env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
if env.nil? || env == 'development' || env == 'test'
  concurrency = 0  # Set to zero to ensure single mode, not clustered mode
  max_threads = 1
end
# WEB_CONCURRENCY and RAILS_MAX_THREADS == 1 in Heroku for now.
concurrency ||= (ENV['WEB_CONCURRENCY'] || 2)
max_threads ||= (ENV['RAILS_MAX_THREADS'] || 5)
worker_timeout 15
workers Integer(concurrency)
threads_count = Integer(max_threads)
threads threads_count, threads_count

#preload_app!

rackup      DefaultRackup
port        ENV['PORT']     || 3000
environment ENV['RACK_ENV'] || 'development'

Solution

  • What I needed to do was complete the server's "on close" event. It needed to clean everything up and then restart itself, which it was not doing.

    I don't like this as the final answer, however. The question would be, why is the server closing up shop, terminating and restarting just because a client dropped? Isn't there a cleaner way to sweep away the detritus of a failed client? Follow up: This fix does answer this particular question, in any case, in that completing onclose resolved the stated problem. Further enhancements threaded the client's WebSocket events in addition to the Redis events such that onclose only closes the client and not the server.

    The new event is:

      ws.on :close do |event|
        if @debug
          puts "MyserverBackend>> Close entered.  Last error:#{$!.class}:#{$!.to_s};Module:#{$0};Line:#{$.};"
          [email protected] { |backtrace| puts backtrace }
          exit
        end
        @clients.each do |clients_ws, clients_channel|
          begin
            @redis.unsubscribe(clients_channel)
            rescue RuntimeError => exception
              unless exception.to_s == "Can't unsubscribe if not subscribed."
                raise
              end
            false
          end
        end
        @clients.delete_if { |clients_ws, clients_channel| clients_ws == ws }
        channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
        puts "MyserverBackend>> Websocket closure for:#{channel}; Event code:#{event.code} Event reason:#{event.reason};"
        ws = nil
        app = Myserver::App
        myserver = MyserverBackend.new(app)
        myserver
      end