I'm using the golang
Redis library from github.com/go-redis/redis
. My client listens on a PubSub channel called 'control'. Whenever a message arrives, I handle it and continue receiving the next message.
I listen endlessly and the messages can come often, or sometimes not for days.
I expect the redis channel to stay open endlessly and receive messages as they are sent.
Usually it runs well for days, but every once in a while client.Receive()
returns EOF
error. After this error, the client no longer receives messages on that channel.
Internally, the redis client prints to stdout the following message:
redis: 2019/08/29 14:18:57 pubsub.go:151: redis: discarding bad PubSub connection: EOF
Disclaimer: I am not certain that this error is what causes me to stop receiving messages, it just seems related.
I'd like to understand why this happens, if this is normal and if reconnecting to the channel via client.Subscribe()
whenever I encounter the behaviour is a good remedy, or should I address the root issue, whatever it may be.
Here is the entire code that handles my client (connect to redis, subscribe to channel, endlessly receive messages):
func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
rootLogger = log.With(zap.String("component", "redis-client"))
host := env.RedisHost
port := env.RedisPort
pass := env.RedisPass
addr := fmt.Sprintf("%s:%s", host, port)
tlsCfg := &tls.Config{}
client = redis.NewClient(&redis.Options{
Addr: addr,
Password: pass,
TLSConfig: tlsCfg,
})
if _, err := client.Ping().Result(); err != nil {
return err
}
go func() {
controlSub := client.Subscribe("control")
defer controlSub.Close()
for {
in, err := controlSub.Receive() // *** SOMETIMES RETURNS EOF ERROR ***
if err != nil {
rootLogger.Error("failed to get feedback", zap.Error(err))
break
}
switch in.(type) {
case *redis.Message:
cm := comm.ControlMessageEvent{}
payload := []byte(in.(*redis.Message).Payload)
if err := json.Unmarshal(payload, &cm); err != nil {
rootLogger.Error("failed to parse control message", zap.Error(err))
} else if err := handleIncomingEvent(&cm); err != nil {
rootLogger.Error("failed to handle control message", zap.Error(err))
}
default:
rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
}
}
}()
return nil
}
I solved the disconnects by ranging over the channel returned from pubsub.Channel()
instead of Receive()
.
Here's the new code:
func listenToControlChannel(client *redis.Client) {
pubsub := client.Subscribe("control")
defer pubsub.Close()
if _, err := pubsub.Receive(); err != nil {
rootLogger.Error("failed to receive from control PubSub", zap.Error(err))
return
}
controlCh := pubsub.Channel()
fmt.Println("start listening on control PubSub")
// Endlessly listen to control channel,
for msg := range controlCh {
cm := ControlMessageEvent{}
payload := []byte(msg.Payload)
if err := json.Unmarshal(payload, &cm); err != nil {
fmt.Printf("failed to parse control message: %s\n", err.Error())
} else if err := handleIncomingEvent(&cm); err != nil {
fmt.Printf("failed to handle control message: %s\n", err.Error())
}
}
}