I'm creating a spring cloud Kafka stream application. I have one input topic and one output topic and I'm trying to apply a KStream Key-Value transform operation for the input topic using KStream.map function
The problem if I have a transformed value as null, the function throws IllegalArgumentException
My questions are:
1: Reason for the exception? Although in the documentation says: "Input records with a null key or a null value are ignored"
2: Best practices to handle exceptions in stateless/stateful operations? Should a try/catch surround the whole processing plan will be enough? Or I should have a try/catch inside every transformation functions (ex. filter, map, join, reduce)?
Any thoughts are appreciated.
spring:
application:
name:kafka-streams-test
cloud.stream:
kafka.streams:
binder:
brokers: localhost:9093
configuration:
commit.interval.ms: 1000
security.protocol: SASL_PLAINTEXT
sasl.mechanism: GSSAPI
sasl.kerberos.service.name: kafka
serdeError: logAndContinue
bindings:
streams-words-input:
consumer:
application-id: Input-Words
streams-words-output:
consumer:
application-id: Output-Words
bindings:
streams-words-input:
destination: streams-words-input
streams-words-output:
destination: streams-words-output
@StreamListener()
@SendTo("streams-words-output")
public KStream<String, Long> createWords(
@Input("streams-words-input") final KStream<String, String> wordsInput){
return wordsInput
.map((key,value) -> KeyValue.pair(key, null));
}
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.messaging.support.MessageBuilder.<init>(MessageBuilder.java:57)
at org.springframework.messaging.support.MessageBuilder.withPayload(MessageBuilder.java:179)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate$$Lambda$743/1725151361.apply(Unknown Source)
at org.apache.kafka.streams.kstream.internals.AbstractStream$2.apply(AbstractStream.java:87)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamPassThrough$KStreamPassThroughProcessor.process(KStreamPassThrough.java:33)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
at org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:48)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
The exception you see is coming from Spring Messaging long standing rule - "there are no Message with null payload". In other words there is no message to be sent if there is nothing to communicate.
That said, there is obviously an issue with KStream and how it handles this condition, so I'd suggest to raise an issue in Kafka binder. Meanwhile you can easily add a filter
operation to your pipeline to filter out the nulls.