Search code examples
redisapache-flinkjedisamazon-elasticacheamazon-kinesis-analytics

Cannot connect Flink to Elasticache Redis cluster - FlinkJedisClusterConfig unable to parse cport in CLUSTER NODES response


How can I use an Elasticache Redis Replication Group as a data sink in Flink for Kinesis Analytics?


I have created an Elasticache Redis Replication Group, and would like to compute something in Flink and store the results in this group.

My Java code,

import org.apache.flink.streaming.api.datastream.DataStream;                              
import org.apache.flink.streaming.api.datastream.DataStreamSink;                          
import org.apache.flink.streaming.connectors.redis.RedisSink;                             
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;    
import java.net.InetSocketAddress;
import java.util.Set;
...

var endpoint = "foo.bar.clustercfg.usw2.cache.amazonaws.com";
var port = 6379;
var node = new InetSocketAddress(endpoint, port);
var jedisConfig = new FlinkJedisClusterConfig.Builder().setNodes(Set.of(node))
                                                       .build();
var redisMapper = new MyRedisMapper();
var redisSink = new RedisSink<>(jedisConfig, redisMapper);

This gives me the following error:

java.lang.NumberFormatException: For input string: "6379@1122"
    at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.base/java.lang.Integer.parseInt(Integer.java:652)
    at java.base/java.lang.Integer.valueOf(Integer.java:983)
    at redis.clients.util.ClusterNodeInformationParser.getHostAndPortFromNodeLine(ClusterNodeInformationParser.java:39)
    at redis.clients.util.ClusterNodeInformationParser.parse(ClusterNodeInformationParser.java:14)
    at redis.clients.jedis.JedisClusterInfoCache.discoverClusterNodesAndSlots(JedisClusterInfoCache.java:50)
    at redis.clients.jedis.JedisClusterConnectionHandler.initializeSlotsCache(JedisClusterConnectionHandler.java:39)
    at redis.clients.jedis.JedisClusterConnectionHandler.<init>(JedisClusterConnectionHandler.java:28)
    at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:21)
    at redis.clients.jedis.JedisSlotBasedConnectionHandler.<init>(JedisSlotBasedConnectionHandler.java:16)
    at redis.clients.jedis.BinaryJedisCluster.<init>(BinaryJedisCluster.java:39)
    at redis.clients.jedis.JedisCluster.<init>(JedisCluster.java:45)

This occurs while trying to parse the response of CLUSTER NODES. The ip:port@cport is expected as part of the response (see https://redis.io/commands/cluster-nodes/) but Jedis is unable to parse this.

Am I doing something wrong here, or is this a bug in Jedis?


Solution

  • After a little digging I found that this is a bug which affects Jedis 2.8 and earlier when using Redis 4.0 or later. https://github.com/redis/jedis/issues/1958

    My Redis cluster is running 6.2.6, and my Apache Flink is 1.13, which is old but is the newest version currently supported by AWS.

    To solve this issue, I had to upgrade Jedis to the latest 2.x version so that this bug was fixed but it was still compatible with the Flink 1.13 libraries. Upgrading Jedis to a 3.x or 4.x version broke Flink.

    <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.10.2</version>
    </dependency>