Search code examples
javaapache-flinkflink-streaming

How to flatMap to database in Apache Flink?


I am using Apache Flink trying to get JSON records from Kafka to InfluxDB, splitting them from one JSON record into multiple InfluxDB points in the process.

I found the flatMap transform and it feels like it fits the purpose. Core code looks like this:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

For some reason, I only get one of those collected points streamed into the database.

Even when I print out all mapped entries, it seems to work just fine: dataStream.print() yields:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

Am I misunderstanding flatMap or might there be some bug in the Influx connector?


Solution

  • The problem was actually related to the fact that a series (defined by its tagset and measurement as seen here) in Influx can only have one point per time, therefore even though my fields differed, the final point overwrote all previous points with the same time value.