Search code examples
spring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafkaspring-cloud-function

Spring Cloud function Routing


I am trying to create a kafka via implementation of Spring cloud function. I am using spring-boot version 2.4.0 and spring cloud version: 2020.0.0

@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {

    private final ArticleMessageMapper articleMessageMapper;

    public ArticleEventPublisher(
        ArticleMessageMapper articleMessageMapper) {
        this.articleMessageMapper = articleMessageMapper;
    }

    @Override
    public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
        String destination ="articleAggregated";
        log.info("CREATING MESSAGE IN FUNCTION BEAN");
        return createMessage( articleAggregatedDocument, destination);
    }

    private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
        ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);

        return MessageBuilder
            .withPayload(articleMessage)
            .setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
            .setHeader("spring.cloud.stream.sendto.destination", destination)
            .build();
    }
}

I understand it's not dynamic binding and the destination is hard coded, however, I want to test for now this particular implementation. Although the control in the application does go to the class and messages are being created, I don't see them in the 'destination'.

I added following in my application.properties:

spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true

I would also like to mention here that the following properties configuration are not resolved anymore in the IDEA intelliJ:

spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages

Are they deprecated? I couldn't find anything about it on the documentation.

From the documentation I also read that the binders should be added for each producer/consumer in case of multibinders, but we are using only one binder for all consumers and producers in the application, so I did not add it. But I did try this in my desperate moment:

spring.cloud.stream.bindings.articleAggregated.binder=kafka

What am I missing here? I have been stuck at this for a while and in the meantime switched to Sink+Supplier implementation as an alternative. But I want to understand why this implementation won't work.


Solution

  • Thank you Rashmi. Let me come up with the best answer I can, but feel free to follow up with more questions. So. . .

    • In your example, I see a single function userInfo. This would result in two bindings - userInfo-in-0 for input and userInfo-out-0 for output.
    • Since this function produces Message with . . .sendto.destination header the userInfo-out-0 binding will only be used s output destination for cases when sendto.destination header is not set.
    • In your configuration i see the the spring.cloud.function.routing.enabled, spring.cloud.function.scan.packages, spring.kafka.bootstrap-servers and three spring.cloud.stream.kafka.* properties which point to default localhost kafka instance, which we already know by default, so non of those properties are necessary.
    • You also don't need spring.cloud.stream.bindings.articleAggregated.binder=kafka property unless you have multiple binders on your classpath which you don't.

    Now I am confused about spring.cloud.stream.bindings.userAggregated.destination property. The userAggregated is a dynamic destination. By simply sending to it s-c-stream will automatically provision it in Kafka if it doesn't exist, otherwise it will be used and you can see your messages there.

    In other words you can remove all of the properties you have configured, send message to userInfo-out-0 destination in Kafka and your should successfully retrieve it in userAggregated destination.

    Please let me know if I am missing something.