I am using spring cloud stream.
I was wondering what is the difference between KafkaHeaders.RECEIVED_MESSAGE_KEY
and KafkaHeaders.MESSAGE_KEY
I have 2 project, the first produce message using KafkaHeaders.MESSAGE_KEY
as a header:
public void sendResponse(ThirdPartyResponse thirdPartyResponse) {
log.info("Sending response of type 'completed' [{}].", thirdPartyResponse);
integrations.send(
withPayload(ApplicationSubmissionSuccessPayload.success(thirdPartyResponse))
.setHeader(KafkaHeaders.MESSAGE_KEY, thirdPartyResponse.getData().getApplicationId())
.build());
}
and the second one consume using KafkaHeaders.RECEIVED_MESSAGE_KEY
@StreamListener(target = "ofaOut")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String applicationId, @Payload String payload) throws JsonProcessingException {
...
}
however I got this error
2020-03-23 16:13:27.924 ERROR 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class java.lang.String], failedMessage=GenericMessage [payload=byte[739],
headers={kafka_offset=285, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@67c19b7c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME,
kafka_receivedMessageKey=null, kafka_receivedPartitionId=0,
contentType=application/json, kafka_receivedTopic=com.product.foo.ofa.out, kafka_receivedTimestamp=1584715870225, kafka_groupId=aop-foo-kyc}]
Its missing the header
Missing header 'kafka_receivedMessageKey'
How can I fix it?
The RECEIVED...
is set on inbound messages; the other one is for the application to specify the key value for outbound messages.
They are different to avoid accidental propagation when an application receives a message does some work and re-publishes the message to, say, another topic.
When using Spring Integration, headers are automatically copied as a message traverses through the flow.
The outbound message mapper does not map the RECEIVED...
headers so they don't appear in the ProducerRecord
.
... kafka_receivedMessageKey=null ...
Means the key was null on the inbound record.
To receive null keys, use
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false)