Search code examples
apache-kafkaapache-kafka-streams

How to obtain the previous message from a topic when processing it with KafkaStreams


I've been implementing Kafka producers/consumers and streams using the data that the covid19api holds.

I'm trying to extract the daily cases for each day from, for instance, the endpoint https://api.covid19api.com/all. However, this service - and the rest of services from this API - has all the data since the begining of the disease (confirmed, death and recovered cases) but accumulated and not the daily cases, which is, in the end, what I'm trying to achieve.

Using transformValues and StoreBuilder (as It was recommended here) didin't work for me either since the scenaries are diferent. I've implemented something different using the transformValue feature but everytime the previous value retrieved is the firts of the topic and not the actual previous:

@Override
public String transform(Long key, String value) {
    String prevValue = state.get(key);
    log.info("{} => {}", key, value) ;
    if (prevValue != null) {
        Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
        Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);

        log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());

        dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());

        String newDto = new Gson().toJson(dto);
        log.info("New value {}", newDto);
        return newDto;
    } else {
        state.put(key, value);
    }
    return value;
}

¿How do I obtain the previous message from a topic when I'm processing it with a stream? Any help or suggestion will be highly appreciated.

Regards.


Solution

  • Is the problem not simply that you're only ever storing the first value you get for each key in the state store? If on each subsequent message you always want the previous message, then you need to always store the current message in the state store as the last step, for exmaple:

    @Override
    public String transform(Long key, String value) {
        String prevValue = state.get(key);
        log.info("{} => {}", key, value) ;
        if (prevValue != null) {
            Covid19StatDto prevDto = new Gson().fromJson(prevValue, Covid19StatDto.class);
            Covid19StatDto dto = new Gson().fromJson(value, Covid19StatDto.class);
    
            log.info("Current value {} previous {} ", dto.toString(), prevDto.toString());
    
            dto.setConfirmed(dto.getConfirmed() - prevDto.getConfirmed());
    
            String newDto = new Gson().toJson(dto);
            log.info("New value {}", newDto);
            return newDto;
        }
    
        // Always update the state store:
        state.put(key, value);
        return value;
    }