Search code examples
rabbitmqspring-kafkaspring-rabbit

Using Topics and Acknowledgment with RabbitMq


I have been trying to use RabbitMq instead of using Kafka. I am a beginner for using RabbitMq. How can I change this KafkaListener event to RabbitMQListener?

I have been trying to find out, but I could not. I need to change this listener for RabbitMQ.

PRODUCER

     private final KafkaTemplate<String, Object> kafkaTemplate;
        
          public AccountEventProducer(KafkaTemplate<String, Object> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
          }
        
          @Override
          public void produce(String topic, BaseEvent event) {
            this.kafkaTemplate.send(topic, event);
          }

CONSUMER

            @KafkaListener(topics = "AccountOpenedEvent", groupId = "${spring.kafka.consumer.group-id}")
            @Override
            public void consume(AccountOpenedEvent event, Acknowledgment ack) {
              eventHandler.on(event);
              ack.acknowledge();
            }

Can anyone help me?


Solution

  • To use rabbitmq on the producer side, follow the steps below:
        
    1. The first step is to add dependencies:
        
       <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
         <version>2.5.5</version>
       </dependency>         
       <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-clenter code hereoud-starter-stream-rabbit</artifactId>
        <version>3.1.3</version>
       </dependency>
    
    2. The second step is to configure the connection to rabbitmq in the yml file:
    
    spring:
      rabbitmq:
        host: localhost
        password: guest
        port: 5672
        username: guest
        exchange: user.exchange
        queue: user.queue
        routingkey: user.routingkey
    
    3. In the third step, create a class to configure the required beans :
    @Configuration
    public class ProducerConfig {   
    @Bean
      public Queue queue(){
        return new Queue("user.queue", false);
      }
      @Bean
      public TopicExchange topicExchange(){
        return new TopicExchange ( "user.exchange" );
      }
      @Bean
      public Binding binding(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind( queue).to( topicExchange).with( "user.routingkey" );
      }
      @Bean
      public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
      }
      @Bean
      public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
         final RabbitTemplate rabbitTemplate = new RabbitTemplate( connectionFactory);
         rabbitTemplate.setMessageConverter(jsonMessageConverter());
         return rabbitTemplate;
      }
    }
    
    5. The last step is to create a service on the producer side to submit a request to rabbitmq
    @Autowired
      AmqpTemplate amqpTemplate; 
      public void send(Object requestEvent){
        amqpTemplate.convertAndSend( "user.exchange","user.routingkey",requestEvent );
        System.out.println("Send messages successfully.");
      }
    }
    
    To use rabbitmq on the consumer side, follow the steps below:
    
    1. first do steps 1 and 2 of the producer side, after that  create a service  to read the message:
    @RabbitListener(queues = "user.queue")
      public void getMessage(Object requestEvent){
        System.out.println(requestEvent.toString());
      }
    2. The second step is create a class to configure the required beans
    @Configuration
    public class ConsumerConfig {
    @Bean
    public MessageConverter jsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
    }
    }