Search code examples
spring-bootapache-kafkajmsspring-kafka

Spring Boot app to integrate Kafka with JMS


I'm trying to build a Spring Boot app that reads messages from Kafka and sends them via JMS and vice versa (read from JMS and write to kafka). However, I didn't find any useful tutorial to jumpstart my project.


Solution

  • See Spring Integration and the Spring Integration Extension for Apache Kafka.

    Use inbound and outbound channel adapters

    jms -> kafka
    
    kafka -> jms
    

    Kafka Connect also has some capabilities in this space, but I am not familiar with it.

    EDIT

    This simple Spring Boot app shows transferring data from Kafka to RabbitMQ and vice versa:

    package com.example.demo;
    
    import org.apache.kafka.clients.admin.NewTopic;
    
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.config.TopicBuilder;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class So61069735Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So61069735Application.class, args);
        }
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Bean
        public ApplicationRunner toKafka() {
            return args -> this.kafkaTemplate.send("so61069735-1", "foo");
        }
    
        @KafkaListener(id = "so61069735-1", topics = "so61069735-1")
        public void listen1(String in) {
            System.out.println("From Kafka: " + in);
            this.rabbitTemplate.convertAndSend("so61069735-2", in.toUpperCase());
        }
    
        @RabbitListener(queues = "so61069735-2")
        public void listen2(String in) {
            System.out.println("From Rabbit: " + in);
            this.kafkaTemplate.send("so61069735-3", in + in);
        }
    
        @KafkaListener(id = "so61069735-3", topics = "so61069735-3")
        public void listen(String in) {
            System.out.println("Final: " + in);
        }
    
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("so61069735-1").partitions(1).replicas(1).build();
        }
    
        @Bean
        public Queue queue() {
            return QueueBuilder.durable("so61069735-2").build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so61069735-3").partitions(1).replicas(1).build();
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    

    Result

    From Kafka: foo
    From Rabbit: FOO
    Final: FOOFOO