Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Streams - Extracting Timestamp for List of Objects per record


What I want to achieve is to get a count of each message present in the record based on the timestamp present in the message. Each record consists of List<Metric> object. I would like to extract the timestamp for each metric and aggregate the metric based on the metric name.

Metric

public class Metric {

    String metric;
    Long timestamp;
    Double value;
}

Custom Timestamp Extractor

I have implemented this timestamp extractor that converts the record to a List object. And it currently fetches the first timestamp that does the windowing for this ArrayList.

public class EventTimestampExtractor implements TimestampExtractor {

    public long extract(ConsumerRecord<Object, Object> record, long previousTimeStamp) {
        try {
            // Have a ListSerde in place to deserialize the record to a  List<Metric> object.
            final List<Metric> value = (List<Metric>) record.value();
            final Metric metric = value.get(0); // Returning the first timestamp from the metric list. 
            return metric.getTimestamp();
        }
        catch (Exception e) {
            // If there is an exception, return back the event time.
            return record.timestamp();
        }
    }
}

Topology

Once I fetch the list, I perform a FlatTransform to transform this List and perform aggregations based on the flattened List.

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, List<Metric>> stream = builder.stream(inputTopic, Consumed.with(Serdes.String(),new MetricListSerde()));

TimeWindows windows = TimeWindows.of(Duration.ofSeconds(10)).grace(Duration.ofSeconds(2));

stream.filter((key, value) -> value != null)
                .flatTransform(() -> new MetricsTransformer()) // Flat transforming the list to single metrics
                .groupByKey()
                .windowedBy(windows)
                .count()
                .toStream()
                .to("output-topic");

Metric List Example - If you notice there is a single metric and 3 counts ( 2 betweeen 0-10 and 1 after 10 seconds)

[{  "metric": "metric1.count",
    "timestamp": 1,
    "value": 30
},{
    "metric": "metric1.count",
    "timestamp": 2,
    "value": 30
}, {
    "metric": "metric1.count",
    "timestamp": 15,
    "value": 30
}]

My window is 10 seconds and I would like to do a get the count for the metric. My current result looks like -

Window{startMs=0, endMs=10} and Value metric: metric1.count value: 3  aggregator: count interval: "10s"}

Expected Result -

Window{startMs=0, endMs=10} and Value metric: metric1.count value: 2  aggregator: count interval: "10s"}
Window{startMs=10, endMs=20} and Value metric: metric1.count value: 1  aggregator: count interval: "10s"}

Apologies for the long question, but is there any way to extract multiple timestamps from a single record that contains a Collection of messages?

Kafka Streams version - 2.4.1


Solution

  • The TimestampExtractor does not help for your use case, because it can only give you a single timestamp. Using flatMap() all output records inherit the timestamp of the input record.

    If you need to modify timestamp on-the-fly, you need to use transform() to implement the "flat map". For each input records, you can call context.forward() multiple times to do the actually flat mapping (you can just return null; at the end to not emit any additional records). In each forward() call, you can set a new timestamp via To.all().withTimestamp(...):

    public KeyValue transform(K key, V value) {
        for (...) {
           context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp);
        }
        return null;
    }