Search code examples
apache-kafkaapache-kafka-streams

Roll back mechanism in kafka processor api?


I am using kafka processor api (not DSL)

public class StreamProcessor implements Processor<String, String> 
{

    public ProcessorContext context;

    public void init(ProcessorContext context) 
    {
        this.context = context;
        context.commit()
        //statestore initialized with key,value
    }

    public void process(String key, String val)
    {
        try
        {
            String[] topicList = stateStore.get(key).split("|"); 
            for(String topic: topicList) 
            {
                    context.forward(key,val,To.child(consumerTopic)); 
            } // forward same message to list of topics ( 1..n topics) , rollback if write to some topics failed ? 
        }
    }
}

Scenario : we are reading data from a source topic and stream processor writes data to multiple sink topics (topicList above) .

Question: How to implement rollback mechanism using kafka streams processor api when one or more of the topics in the topicList above fails to receive the message ? .

What I understand is processor api has rollback mechanism for each record it failed to send, or can roll back for an an entire batch of messages which failed be achieved as well? as process method in processor interface is called per record rather than per batch hence I would surmise it can only be done per record.Is this correct assumption ?, if not please suggest how to achieve per record and per batch rollbacks for failed topics using processor api.


Solution

  • You would need to implement it yourself. For example, you could use two stores: main-store, and "buffer" store and first only update the buffer store, call context.forward() second to make sure all write are in the output topic, and afterward merge the "buffer" store into the main store.

    If you need to roll back, you drop the content from the buffer store.