Search code examples
spring-integrationspring-cloud-streamspring-amqpspring-integration-aws

Spring Boot , Spring Integration RabbitMQ & AWS Kinesis Integration


I have a requirement to consume messages from rabbitMQ, do some processing and finally publish messages to the Kinesis Data stream. We are already using Spring Boot, Spring Integration Core & Spring Integration AQMP 5.5.1 Integration Flows for consuming messages from RabbitMQ. We are not using Spring Cloud Stream for any of our projects.

What spring library do you suggest for the use case to publish messages to the Kinesis data stream? After going through the Spring docs, I see a couple of options available. Could you please advise which is the best one to pursue?

  1. spring-cloud-stream-binder-aws-kinesis
  2. spring-integration-aws

Solution

  • As long as you are not concerned about Spring Cloud Stream, you should not bring that AWS Kinesis Binder for Spring Cloud Stream dependency into your project. Just because it is not going to work without Spring Cloud Stream features in your project.

    Since your application is really a Spring Integration one, you definitely need to use that Spring Integration for AWS dependency. It comes with a KinesisMessageHandler implementation to produce records into Kinesis stream.

    See its docs for more info: https://github.com/spring-projects/spring-integration-aws#outbound-channel-adapter-3. Such a handler should be declared as a bean and can be used in the .handle() endpoint of an IntegrationFlow definition. See docs about existing handlers and missed Java DSL factories: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters

    UPDATE

    How to use Kinesis MH in Java DSL:

    @Bean
    public MessageHandler kinesisMessageHandler(AmazonKinesis amazonKinesis) {
        KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
        return kinesisMessageHandler;
    }
    
    @Bean
    IntegrationFlow someFlow(MessageHandler kinesisMessageHandler) {
        return f -> f
                  .handle(kinesisMessageHandler);
    }