I am trying to create a kafka via implementation of Spring cloud function. I am using spring-boot version 2.4.0 and spring cloud version: 2020.0.0
@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {
private final ArticleMessageMapper articleMessageMapper;
public ArticleEventPublisher(
ArticleMessageMapper articleMessageMapper) {
this.articleMessageMapper = articleMessageMapper;
}
@Override
public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
String destination ="articleAggregated";
log.info("CREATING MESSAGE IN FUNCTION BEAN");
return createMessage( articleAggregatedDocument, destination);
}
private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);
return MessageBuilder
.withPayload(articleMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
I understand it's not dynamic binding and the destination is hard coded, however, I want to test for now this particular implementation. Although the control in the application does go to the class and messages are being created, I don't see them in the 'destination'.
I added following in my application.properties:
spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true
I would also like to mention here that the following properties configuration are not resolved anymore in the IDEA intelliJ:
spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages
Are they deprecated? I couldn't find anything about it on the documentation.
From the documentation I also read that the binders should be added for each producer/consumer in case of multibinders, but we are using only one binder for all consumers and producers in the application, so I did not add it. But I did try this in my desperate moment:
spring.cloud.stream.bindings.articleAggregated.binder=kafka
What am I missing here? I have been stuck at this for a while and in the meantime switched to Sink+Supplier implementation as an alternative. But I want to understand why this implementation won't work.
Thank you Rashmi. Let me come up with the best answer I can, but feel free to follow up with more questions. So. . .
userInfo
. This would result in two bindings - userInfo-in-0
for input and userInfo-out-0
for output.. . .sendto.destination
header the userInfo-out-0
binding will only be used s output destination for cases when sendto.destination
header is not set.spring.cloud.function.routing.enabled
, spring.cloud.function.scan.packages
, spring.kafka.bootstrap-servers
and three spring.cloud.stream.kafka.*
properties which point to default localhost kafka instance, which we already know by default, so non of those properties are necessary.spring.cloud.stream.bindings.articleAggregated.binder=kafka
property unless you have multiple binders on your classpath which you don't.Now I am confused about spring.cloud.stream.bindings.userAggregated.destination
property. The userAggregated
is a dynamic destination. By simply sending to it s-c-stream will automatically provision it in Kafka if it doesn't exist, otherwise it will be used and you can see your messages there.
In other words you can remove all of the properties you have configured, send message to userInfo-out-0
destination in Kafka and your should successfully retrieve it in userAggregated
destination.
Please let me know if I am missing something.