Search code examples
redisapache-flinkflink-streamingbackpressureflink-sql

How to write data from flink pipeline to redis efficiently


I am building a pipeline in Apache flink sql api. The pipeline does simple projection query. However, I need to write the tuples (precisely some elements in the each tuple) once before the query and another time after the query. It turned out that my code that I am using to write to redis severely degrades performance. I.e the flink makes back pressure in a very small rate of data. What's wrong with my code and how can I improve. Any recommendations please.

When I stopped writing to redis before and after the performance was excellent. Here is my pipeline code:

public class QueryExample {
    public static Long throughputCounterAfter=new Long("0");
    public static void main(String[] args) {
        int k_partitions = 10;
        reamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(5 * 32);
        Properties props = new Properties();
        props.setProperty("zookeeper.connect", "zookeeper-node-01:2181");
        props.setProperty("bootstrap.servers", "kafka-node-01:9092,kafka-node-02:9092,kafka-node-03:9092");
        // not to be shared with another job consuming the same topic
        props.setProperty("group.id", "flink-group");
        props.setProperty("enable.auto.commit","false");
        FlinkKafkaConsumer011<String> purchasesConsumer=new FlinkKafkaConsumer011<String>("purchases",
                new SimpleStringSchema(),
                props);

        DataStream<String> purchasesStream = env
                .addSource(purchasesConsumer)
                .setParallelism(Math.min(5 * 32, k_partitions));
        DataStream<Tuple4<Integer, Integer, Integer, Long>> purchaseWithTimestampsAndWatermarks =
                purchasesStream
                        .flatMap(new PurchasesParser())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<Integer, Integer, Integer, Long>>(Time.seconds(10)) {

                            @Override
                            public long extractTimestamp(Tuple4<Integer, Integer, Integer, Long> element) {
                                return element.getField(3);
                            }
                        });

        Table purchasesTable = tEnv.fromDataStream(purchaseWithTimestampsAndWatermarks, "userID, gemPackID,price, rowtime.rowtime");
        tEnv.registerTable("purchasesTable", purchasesTable);

        purchaseWithTimestampsAndWatermarks.flatMap(new WriteToRedis());
        Table result = tEnv.sqlQuery("SELECT  userID, gemPackID, rowtime from purchasesTable");
        DataStream<Tuple2<Boolean, Row>> queryResultAsDataStream = tEnv.toRetractStream(result, Row.class);
        queryResultAsDataStream.flatMap(new WriteToRedis());
        try {
            env.execute("flink SQL");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }




/**
 * write to redis
 */
public static class WriteToRedis extends RichFlatMapFunction<Tuple4<Integer, Integer, Integer, Long>, String> {
    RedisReadAndWrite redisReadAndWrite;

    @Override
    public void open(Configuration parameters) {
        LOG.info("Opening connection with Jedis to {}", "redis");
        this.redisReadAndWrite = new RedisReadAndWrite("redis",6379);

    }

    @Override
    public void flatMap(Tuple4<Integer, Integer, Integer, Long> input, Collector<String> out) throws Exception {
        this.redisReadAndWrite.write(input.f0+":"+input.f3+"","time_seen", TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
    }
}
}


public class RedisReadAndWrite {
    private Jedis flush_jedis;

    public RedisReadAndWrite(String redisServerName , int port) {
        flush_jedis=new Jedis(redisServerName,port);
    }


    public void write(String key,String field, String value) {
        flush_jedis.hset(key,field,value);

    }
}

Additional part: I tried the second implementation the process function that batch the writing toredis using Jedis. However I am getting the following error. org.apache.flink.runtime.client.JobExecutionException: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Socket is not connected. I tried to make even the number of batched messages smaller and I am still getting errors after a while.

Here is the implementation of the process function:

/** * write to redis using process function */

public static class WriteToRedisAfterQueryProcessFn extends ProcessFunction<Tuple2<Boolean, Row>, String> {
    Long timetoFlush;
    @Override
    public void open(Configuration parameters) {
        flush_jedis=new Jedis("redis",6379,1800);
        p = flush_jedis.pipelined();
        this.timetoFlush=System.currentTimeMillis()-initialTime;
    }

    @Override
    public void processElement(Tuple2<Boolean, Row> input, Context context, Collector<String> collector) throws Exception {
        p.hset(input.f1.getField(0)+":"+new Instant(input.f1.getField(2)).getMillis()+"","time_updated",TimeUnit.NANOSECONDS.toMillis(System.nanoTime())+"");
        throughputAccomulationcount++;
        System.out.println(throughputAccomulationcount);
        if(throughputAccomulationcount==50000){
            throughputAccomulationcount=0L;
            p.sync();
        }
    }
}

Solution

  • The poor performance you are experiencing is no doubt due to the fact that you are making a synchronous request to redis for each write. @kkrugler has already mentioned async i/o, which is a common remedy for this situation. That would require switching to one of the redis clients that supports asynchronous operation.

    Another solution that is commonly used when working with external services is to batch together groups of writes. With jedis, you could use pipelining. For example, you could replace the WriteToRedis RichFlatMapFunction with a ProcessFunction that does pipelined writes to redis in batches of some size, and that relies on a timeout to flush its buffer, as needed. You could use Flink's ListState for the buffer.