Search code examples
javaapache-kafkaapache-kafka-streamsevent-driven-design

How to do versioning of Custom Serdes objects in Kafka Stream while using processor API during Rolling Upgrade


How to achieve handling multiple versions of UserDetailDto while processing it from Topic-A to Topic-B with Kafka stream using processor API.

Existing instance/replica of aggregation service should not impacted and Kubernetes upgrade scenario should also not hamper (means old version of aggregation replica service are able to handle the modified/new versioned of UserDetailDto).

For Example, modify the UserId datatype from Integer to String and remove UserPhone field from the below User detail dto

class UserDetailDto{
    @JsonProperty("userID)
    @NotNull(message = "UserId can not be null")
    private int userID;

    @JsonProperty("userPhone")
    @NotNull(message = "User Phone number can not be null")
    private int userPhone;

Now after update UserDetailDto, old replica/instance of aggregation service should able to handle both new or old UserdetailDto and also new replica/instance of aggregation service should able to new or old UserdetailDto.

My Processor as given below with Custom Serde UserDetailDto

public class AggregationProcessor implements Processor<String, UserDetailDto, String, UserDetailDto> {

    private ProcessorContext<String, UserDetailDto> processorContext;

    public AggregationProcessor() {
        super();
    }

    @Override
    public void init(ProcessorContext<String, UserDetailDto> processorContext) {
        System.out.println("Inside Aggregation Processor init method.");
        Objects.requireNonNull(processorContext, "Processor context should not be null or empty.");

        this.processorContext = processorContext;
    }

    @Override
    public void process(Record<String, UserDetailDto> message) {
        System.out.println("Inside AggregationProcessor init method - to initialize all the resources for the data processing.");
        Objects.requireNonNull(processorContext, "Processor context should not be null or empty.");

        // Forwarding the message as is it without doing any modification
        processorContext.forward(message);
    }

    @Override
    public void close() {
        System.out.println("Inside AggregationProcessor close method.");
    }

Topology given below

Topology topology = new Topology();

// Adding the sourceNode of the application
topology = topology.addSource(Topology.AutoOffsetReset.EARLIEST,
        sourceName,
        new UsePartitionTimeOnInvalidTimestamp(),
        KEY_SERDE.deserializer(),
        USER_DETAIL_DTO.deserializer(),
        sourceTopic);

// Adding the processorNode of the application
topology = topology.addProcessor(
        processorName,
        AggregationProcessor::new,
        parentNames);

// Adding sinkNode of the application
topology = topology.addSink(sinkName,
        destinationTopic,
        KEY_SERDE.serializer(),
        USER_DETAIL_DTO.serializer(),
        parentNames);

Please provide all possible suggestions.Thanks!


Solution

  • In Kafka streaming applications using processor API, we can do Data Feed Serdes validations at consumer application inside init() or process() methods of kafka.streams.processor.api.Processor implementation which would be one of the standard way during Rolling upgrade scenario.

    Data Feed validation can be achieved in processor API as below and This support should be provided for two consecutives versions for Rolling back scenario as well.

    1. Old Producer to New Consumer

    In new consumer, Mark old field as deprecated by removing validation from processor API with its support. Add new field with validation, So in this case Data feed can't be process by new consumer and only be consume by Old consumer as it will still persist in source topic, this way inflight packages will only be processed by old consumer only during rolling upgrade scenario.

    1. New Producer to Old Consumer

    In Old consumer, newly added field in Stream Data feed will be ignore and dummy value of deprecated field can be validated in processor API.

    1. New Producer to New Consumer And Old Producer to Old Consumer

    No impact.