Search code examples
goredispublish-subscribe

Redis golang client periodically discarding bad PubSub connection (EOF)


What I did:

I'm using the golang Redis library from github.com/go-redis/redis. My client listens on a PubSub channel called 'control'. Whenever a message arrives, I handle it and continue receiving the next message. I listen endlessly and the messages can come often, or sometimes not for days.

What I expect:

I expect the redis channel to stay open endlessly and receive messages as they are sent.

What I experience:

Usually it runs well for days, but every once in a while client.Receive() returns EOF error. After this error, the client no longer receives messages on that channel. Internally, the redis client prints to stdout the following message:

redis: 2019/08/29 14:18:57 pubsub.go:151: redis: discarding bad PubSub connection: EOF

Disclaimer: I am not certain that this error is what causes me to stop receiving messages, it just seems related.

Additional questions:

I'd like to understand why this happens, if this is normal and if reconnecting to the channel via client.Subscribe() whenever I encounter the behaviour is a good remedy, or should I address the root issue, whatever it may be.

The code:

Here is the entire code that handles my client (connect to redis, subscribe to channel, endlessly receive messages):

func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
    rootLogger = log.With(zap.String("component", "redis-client"))

    host := env.RedisHost
    port := env.RedisPort
    pass := env.RedisPass
    addr := fmt.Sprintf("%s:%s", host, port)
    tlsCfg := &tls.Config{}
    client = redis.NewClient(&redis.Options{
        Addr:      addr,
        Password:  pass,
        TLSConfig: tlsCfg,
    })

    if _, err := client.Ping().Result(); err != nil {
        return err
    }

    go func() {
        controlSub := client.Subscribe("control")
        defer controlSub.Close()
        for {
            in, err := controlSub.Receive()  // *** SOMETIMES RETURNS EOF ERROR ***
            if err != nil {
                rootLogger.Error("failed to get feedback", zap.Error(err))
                break
            }
            switch in.(type) {
            case *redis.Message:
                cm := comm.ControlMessageEvent{}
                payload := []byte(in.(*redis.Message).Payload)
                if err := json.Unmarshal(payload, &cm); err != nil {
                    rootLogger.Error("failed to parse control message", zap.Error(err))
                } else if err := handleIncomingEvent(&cm); err != nil {
                    rootLogger.Error("failed to handle control message", zap.Error(err))
                }

            default:
                rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
            }
        }
    }()
    return nil
}

Solution

  • I solved the disconnects by ranging over the channel returned from pubsub.Channel() instead of Receive().

    Here's the new code:

    
    func listenToControlChannel(client *redis.Client) {
        pubsub := client.Subscribe("control")
        defer pubsub.Close()
    
        if _, err := pubsub.Receive(); err != nil {
            rootLogger.Error("failed to receive from control PubSub", zap.Error(err))
            return
        }
    
        controlCh := pubsub.Channel()
        fmt.Println("start listening on control PubSub")
    
        // Endlessly listen to control channel,
        for msg := range controlCh {
            cm := ControlMessageEvent{}
            payload := []byte(msg.Payload)
            if err := json.Unmarshal(payload, &cm); err != nil {
                fmt.Printf("failed to parse control message: %s\n", err.Error())
            } else if err := handleIncomingEvent(&cm); err != nil {
                fmt.Printf("failed to handle control message: %s\n", err.Error())
            }
        }
    }