I have given topology with source from topic, processor and sink to other topic
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"),
Serdes.String(),
Serdes.String());
Topology topology = new Topology();
topology.addSource("incoming", Serdes.String().deserializer(), Serdes.String().deserializer(), "topic");
topology.addProcessor("incoming_first", () -> new MyProcessor(), "incoming");
topology.addStateStore(storeBuilder, "incoming_first");
topology.addSink("sink", "sink", "incoming_first"),
public class MyProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, String> stateStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.stateStore = (KeyValueStore<String, String>) context.getStateStore("store");
}
@Override
public void process(String key, String value) {
stateStore.put(key, value);
....
throw new RuntimeException();
....
context.forward(); //forward to sink
}
@Override
public void close() {
}
}
My question is how to handle situations when some exception occurs in the processor after write to the state store. Does Kafka has some error handling mechanism with state store rollback to reprocess the message again or forward it to the error topic?
Currently, without any handling, my application entirely dies and I need to restart it. Also, if I add some try-catch the message identified as ok and my state store is updated and the message is sent to the changelog topic.
Do I need some rollback mechanisms for the state store?
https://issues.apache.org/jira/browse/KAFKA-7192 KIP says that if exceptions occurred the state store should not be processed with EOS, but this is valid only for the case when my entire application dies.
Thanks in advance!
For any exception that is thrown from a Processor
the corresponding thread will always die. The only way to prevent this, is by catching all exceptions and handle them accordingly (whatever the right way to handle is for your application).
If a thread dies and you restart your application to recover the thread, it depends on your configuration if the store will be rolled back or not. By default, the store would not be rolled back. Only if you enable exactly-once semantics by setting configuration parameter processing.guarantees="exactly_once"
the store would be rolled back on restart.
If you catch any exception in your Processor
code and your business logic requires to roll back the store, you need to implement this yourself, by first getting the old values from the store, updating the store, and in cause of an exception putting the old values back into the store to overwrite/undo all your writes.