I am using Spring Cloud with Apache Kafka. The messages sent to a kafka topic usally contain a key as well. How can I set that key?
Let's say, I produce messages like this
@StreamListener(CashflowSink.T1_CASHFLOW_IN)
@SendTo(CashflowSink.T2_CASHFLOW_OUT)
public synchronized Cashflow receive1(String message) {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message);
String arr[] = message.split(";");
if (arr[0].equalsIgnoreCase("Cashflow")) {
Cashflow cf = new Cashflow();
cf.setContractId(Integer.parseInt(arr[1]));
cf.setDate(arr[2]);
cf.setAmount(Float.parseFloat(arr[3]));
return cf;
}
return null;
}
Later I want to join cashflows with other topics. For joins, I need a key. Let's say the key is the contractId of the cashflow. How can I set that key in the message?
EDIT 1
Trying to incorporate the code from Gary, I came up with:
//@Bean //--> leads to: Parameter 0 of method runner in tki.bigdata.steams.CashflowService required a single bean, but 10 were found:
@Qualifier("t2_cashflow_in") // is this a correct way to get a handle to the channel?
public ApplicationRunner runner(MessageChannel output) {
((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, ((Cashflow) message.getPayload()).getContractId())
.build();
}
});
return args -> {
};
}
@StreamListener(CashflowSink.T2_CASHFLOW_IN)
public synchronized void receive2(Cashflow cashflow, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
System.out.println("******************");
System.out.println("At Sink2");
System.out.println("******************");
System.out.println("Received cashflow " + cashflow);
System.out.println(cashflow + ", key:" + new String(key));
}
The @Bean annotation gave me an error, probably because there are multiple "channels". I followed a hint in the error message and used the @Channel annotation. Not sure, if this does what I want to achieve.
Executing that code, I am getting Errors like the following. So I assume the interception did not work?
2019-06-20 20:56:02,557 ERROR [Ljava.lang.String;@4195105b.container-0-C-1 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = tier2.cashflow, partition = 0, offset = 11, CreateTime = 1561056959426, serialized key size = -1, serialized value size = 63, headers = RecordHeaders(headers = [RecordHeader(key = deliveryAttempt, value = [49]), RecordHeader(key = contentType, value = [34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110, 34]), RecordHeader(key = spring_json_header_types, value = [123, 34, 100, 101, 108, 105, 118, 101, 114, 121, 65, 116, 116, 101, 109, 112, 116, 34, 58, 34, 106, 97, 118, 97, 46, 117, 116, 105, 108, 46, 99, 111, 110, 99, 117, 114, 114, 101, 110, 116, 46, 97, 116, 111, 109, 105, 99, 46, 65, 116, 111, 109, 105, 99, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@5884f4e2)
org.springframework.messaging.MessageHandlingException: Missing header 'kafka_receivedMessageKey' for method parameter type [class [B]
at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:105)
at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:106)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
For the case that there is more useful information, I post the whole class
@EnableBinding(CashflowService.CashflowSink.class)
public class CashflowService {
@StreamListener(CashflowSink.T1_CASHFLOW_IN)
@SendTo(CashflowSink.T2_CASHFLOW_OUT)
public synchronized Cashflow receive1(String message) {
System.out.println("******************");
System.out.println("At Sink1");
System.out.println("******************");
System.out.println("Received message " + message);
String arr[] = message.split(";");
if (arr[0].equalsIgnoreCase("Cashflow")) {
Cashflow cf = new Cashflow();
cf.setContractId(Integer.parseInt(arr[1]));
cf.setDate(arr[2]);
cf.setAmount(Float.parseFloat(arr[3]));
return cf;
}
return null;
}
// @Bean //--> leads to: Parameter 0 of method runner in
// tki.bigdata.steams.CashflowService required a single bean, but 10 were
// found:
@Qualifier("t2_cashflow_in") // is this a correct way to get a handle to the
// channel?
public ApplicationRunner runner(MessageChannel output) {
((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, ((Cashflow) message.getPayload()).getContractId()).build();
}
});
return args -> {
};
}
@StreamListener(CashflowSink.T2_CASHFLOW_IN)
public synchronized void receive2(Cashflow cashflow, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
System.out.println("******************");
System.out.println("At Sink2");
System.out.println("******************");
System.out.println("Received cashflow " + cashflow);
System.out.println(cashflow + ", key:" + new String(key));
}
public interface CashflowSink {
String T1_CASHFLOW_IN = "t1_cashflow_in";
String T2_CASHFLOW_IN = "t2_cashflow_in";
String T1_CASHFLOW_OUT = "t1_cashflow_out";
String T2_CASHFLOW_OUT = "t2_cashflow_out";
@Input(T1_CASHFLOW_IN)
SubscribableChannel t1_cashflow_in();
@Input(T2_CASHFLOW_IN)
SubscribableChannel t2_cashflow_in();
@Output(T1_CASHFLOW_OUT)
SubscribableChannel t1_cashflow_out();
@Output(T2_CASHFLOW_OUT)
SubscribableChannel t2_cashflow_out();
}
}
There is a messageKeyExpression
kafka extension producer property; the problem is, the expression is evaluated after the payload is converted (unless you are using nativeEncoding
, in which case you can use that with payload.contractId
).
If you are not using native encoding, you can add an interceptor (before the conversion interceptor) and promote the key to the KafkaHeaders.MESSAGE_KEY
header.
Here is an example:
@SpringBootApplication
@EnableBinding(Processor.class)
public class So56689257Application {
public static void main(String[] args) {
SpringApplication.run(So56689257Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel output) {
((AbstractMessageChannel) output).addInterceptor(0, new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.MESSAGE_KEY, ((Foo) message.getPayload()).getContractId().getBytes())
.build();
}
});
return args -> {
output.send(MessageBuilder.withPayload(new Foo("someContractId")).build());
};
}
@StreamListener(Processor.INPUT)
public void listen(Foo foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
System.out.println(foo + ", key:" + new String(key));
}
public static class Foo {
private String contractId;
public Foo() {
super();
}
public Foo(String contractId) {
this.contractId = contractId;
}
public String getContractId() {
return this.contractId;
}
public void setContractId(String contractId) {
this.contractId = contractId;
}
@Override
public String toString() {
return "Foo [contractId=" + this.contractId + "]";
}
}
}
and
Foo [contractId=someContractId], key:someContractId
The RabbitMQ binder already has a built-in RabbitExpressionEvaluatingInterceptor
(which evaluates expressions before payload conversion) but there is currently no equivalent in the Kafka binder, so you have to add your own interceptor.
Or, if you are using json, you can use #jsonpath
in the expression to extract the key.