Search code examples
apache-kafka-streamsspring-kafka

Kafka Streams: Exception not caught by try-catch block


I am using Spring with kafka Streams. I am trying to simulate an error scenario. As you can see from the code, if I pass the input value as 0 to one of the input topics, I will get an divide by 0 exception.

I have wrapped the error causing section in a try-catch block assuming that the error will be caught by the catch block.

@Component
@Slf4j
public class Topology {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void KStreamsTopology(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
                .stream(List.of("quickstart-events", "sample-input"), Consumed.with(STRING_SERDE, STRING_SERDE).withName("my-inputs"));
        try {
            messageStream
                    .peek((k, v) -> System.out.println("Input Key: " + k + ", value: " + v))
                    .mapValues(a -> getAnInt(Integer.parseInt(a)))
                    .foreach((k, v) -> System.out.println("output Key: " + k + ", Output value: " + v));
        } catch (Exception exception) {
           log.error("Exceptions occurred in my Topology:: " + exception.getMessage());
        }

    }

    private int getAnInt(Integer value) {
        return 10 / value;
    }
}

But the error is not caught. Instead, the below exception is thrown and the application stops.

2022-08-10 23:26:27.218  INFO 40343 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-StreamThread-1] Shutdown complete
Exception in thread "kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=my-inputs, topic=sample-input, partition=0, offset=10, stacktrace=java.lang.ArithmeticException: / by zero
    at com.techopact.kafkastreamspoc.topology.Topology.getAnInt(Topology.java:37)
    at com.techopact.kafkastreamspoc.topology.Topology.lambda$KStreamsTopology$1(Topology.java:28)
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1296)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: java.lang.ArithmeticException: / by zero
    at com.techopact.kafkastreamspoc.topology.Topology.getAnInt(Topology.java:37)
    at com.techopact.kafkastreamspoc.topology.Topology.lambda$KStreamsTopology$1(Topology.java:28)
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
    ... 4 more
2022-08-10 23:31:26.921  INFO 40343 --- [90c3e38db-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-admin] Node -1 disconnected.
2022-08-10 23:36:27.007  INFO 40343 --- [90c3e38db-admin] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=kafka-streams-poc-de132014-fb30-4c10-b5d5-7b190c3e38db-admin] Node 0 disconnected.

I am aware that this is not the right way to handle exceptions in Kafka Stream application. I see a nice detailed article here. But I am still curious as to why the exception is not caught by the catch block.


Solution

  • I added some print statements to the code.

    @Component
    @Slf4j
    public class Topology {
    
        private static final Serde<String> STRING_SERDE = Serdes.String();
    
        @Autowired
        void KStreamsTopology(StreamsBuilder streamsBuilder) {
            KStream<String, String> messageStream = streamsBuilder
                    .stream(List.of("sample-input"),
                            Consumed.with(STRING_SERDE, STRING_SERDE).withName("my-inputs"));
            try {
                System.out.println("Start.... " + Thread.currentThread().getName());
                messageStream
                        .peek((k, v) -> System.out.println("Input Key: " + k + ", value: " + v))
                        .mapValues(a -> getAnInt(Integer.parseInt(a)))
                        .foreach((k, v) -> System.out.println("output Key: " + k + ", Output value: " + v));
            } catch (Exception exception) {
               log.error("Exceptions occurred in my Topology:: " + exception.getMessage());
            }
    
        }
    
        private int getAnInt(Integer value) {
            System.out.println("Inside getInt:: " + Thread.currentThread().getName());
            return 10 / value;
        }
    }
    

    I saw these lines in the output.

    Start.... main 
    Inside getInt:: kafka-streams-poc-45acf83e-411f-4a21-833e-c7d9ccf1fe90-StreamThread-1
    

    As Artem Bilan pointed out, the Kafka stream nodes are run in a different thread other than the main thread