Search code examples
rubywebsocketredispublish-subscribeeventmachine

WebSocket and Redis results in hanging connection from pubsub and/or brpop


I am issuing a Redis subscribe within WebSocket (WS). When I receive the WS open, I thread the request and then instantiate the Redis client. Within the open, I thread for Redis and issue the subscription.

This all works fine, until I receive an unexpected WS close. At that point, the thread running the Redis subscription is gone. If I issue an unsubscribe, I get a hang. If I don't unsubscribe, I have left a phantom subscription that causes me trouble next go round.

Is there some way to delete a subscription once the thread that issued it has terminated? I have noted that the Redis instance has a mon variable for that terminated thread. Sample Ruby code is:

class Backend
  include MInit

  def initialize(app)
    setup
    @app = app
  end

  def run!(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, [], ping: KEEPALIVE_TIME)
      ws_thread = Thread.fork(env) do
        credis = Redis.new(host: @redis_uri.host, port: @redis_uri.port, password: @redis_uri.password)

        ws.on :open do |event|
          channel = URI.parse(event.target.url).path[1..URI.parse(event.target.url).path.length]
          redis_thread = Thread.fork do
            credis.subscribe(channel) do |on|
              on.message do |message_channel, message|
                sent = ws.send(message)
              end
              on.unsubscribe do |message_channel|
                puts "Unsubscribe on channel:#{channel};"
              end
            end
          end
        end

        ws.on :message do |event|
          handoff(ws: ws, event: event)
        end

        ws.on :close do |event|
          # Hang occurs here
          unsubscribed = credis.unsubscribe(channel)
        end

        ws.on :error do |event|
          ws.close
        end

        # Return async Rack response
        ws.rack_response

      end
    end
  else
    @app.call(env)
  end

  private
  def handoff(ws: nil, event: nil, source: nil, message: nil)
    # processing
  end
end

Solution

  • The fix is rather simple once I really understood the problem. The Redis thread actually still exists. But, Redis is hanging anyway because it is waiting for the thread to get control. To do that, the WS.close code needs to cede control by using EM.next_tick within the WS.close as follows:

    ws.on :close do |event|
      EM.next_tick do
        # Hang occurs here
        unsubscribed = credis.unsubscribe(channel)
      end
    end