I am using Spring with kafka Streams. I am trying to simulate an error scenario. As you can see from the code, if I pass the input value as 0 to one of the input topics, I will get an divide by 0
exception.
I have wrapped the error causing section in a try-catch block assuming that the error will be caught by the catch block.
@Component
@Slf4j
public class Topology {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void KStreamsTopology(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream(List.of("quickstart-events", "sample-input"), Consumed.with(STRING_SERDE, STRING_SERDE).withName("my-inputs"));
try {
messageStream
.peek((k, v) -> System.out.println("Input Key: " + k + ", value: " + v))
.mapValues(a -> getAnInt(Integer.parseInt(a)))
.foreach((k, v) -> System.out.println("output Key: " + k + ", Output value: " + v));
} catch (Exception exception) {
log.error("Exceptions occurred in my Topology:: " + exception.getMessage());
}
}
private int getAnInt(Integer value) {
return 10 / value;
}
}
But the error is not caught. Instead, the below exception is thrown and the application stops.
2022-08-10 23:26:27.218 INFO 40343 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-StreamThread-1] Shutdown complete
Exception in thread "kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=my-inputs, topic=sample-input, partition=0, offset=10, stacktrace=java.lang.ArithmeticException: / by zero
at com.techopact.kafkastreamspoc.topology.Topology.getAnInt(Topology.java:37)
at com.techopact.kafkastreamspoc.topology.Topology.lambda$KStreamsTopology$1(Topology.java:28)
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.lang.ArithmeticException: / by zero
at com.techopact.kafkastreamspoc.topology.Topology.getAnInt(Topology.java:37)
at com.techopact.kafkastreamspoc.topology.Topology.lambda$KStreamsTopology$1(Topology.java:28)
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
... 4 more
2022-08-10 23:31:26.921 INFO 40343 --- [90c3e38db-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-admin] Node -1 disconnected.
2022-08-10 23:36:27.007 INFO 40343 --- [90c3e38db-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-admin] Node 0 disconnected.
I am aware that this is not the right way to handle exceptions in Kafka Stream application. I see a nice detailed article here. But I am still curious as to why the exception is not caught by the catch block.
I added some print statements to the code.
@Component
@Slf4j
public class Topology {
private static final Serde<String> STRING_SERDE = Serdes.String();
@Autowired
void KStreamsTopology(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder
.stream(List.of("sample-input"),
Consumed.with(STRING_SERDE, STRING_SERDE).withName("my-inputs"));
try {
System.out.println("Start.... " + Thread.currentThread().getName());
messageStream
.peek((k, v) -> System.out.println("Input Key: " + k + ", value: " + v))
.mapValues(a -> getAnInt(Integer.parseInt(a)))
.foreach((k, v) -> System.out.println("output Key: " + k + ", Output value: " + v));
} catch (Exception exception) {
log.error("Exceptions occurred in my Topology:: " + exception.getMessage());
}
}
private int getAnInt(Integer value) {
System.out.println("Inside getInt:: " + Thread.currentThread().getName());
return 10 / value;
}
}
I saw these lines in the output.
Start.... main
Inside getInt:: kafka-streams-poc-45acf83e-411f-4a21-833e-c7d9ccf1fe90-StreamThread-1
As Artem Bilan pointed out, the Kafka stream nodes are run in a different thread other than the main
thread