Search code examples
springspring-bootrabbitmqamqpspring-amqp

RabbitMQ separate listeners by type


I have POJO which represents a message to Rabbit MQ. There is an integer which is responsible for the type of the message(whether it's update, remove, add and so on):

public class Message {
    private String field1;
    private String field2;

    private Integer type;
    ...
    <some other fields>
}

I have a consumer which accepts such messages in my spring boot app. So in order to handle each type separately, I have to add some switch/case construction in my code.

Are there any more clear solutions for such case?


Solution

  • You can use Spring Integration with a router...

    Rabbit Inbound channel adapter -> router -> 
    

    Where the router routes to a different service activator (method) based on the type.

    EDIT

    Here's an example:

    @SpringBootApplication
    public class So47272336Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So47272336Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate rabbitTemplate) {
            return args -> {
                rabbitTemplate.convertAndSend("my.queue", new Domain(1, "foo"));
                rabbitTemplate.convertAndSend("my.queue", new Domain(2, "bar"));
                rabbitTemplate.convertAndSend("my.queue", new Domain(3, "baz"));
            };
        }
    
        @Bean
        public Queue queue() {
            return new Queue("my.queue");
        }
    
        @Bean
        public IntegrationFlow flow(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "my.queue"))
                    .route("payload.type", r -> r
                            .subFlowMapping("1", f -> f.handle("bean", "add"))
                            .subFlowMapping("2", f -> f.handle("bean", "remove"))
                            .subFlowMapping("3", f -> f.handle("bean", "update")))
                    .get();
        }
    
        @Bean
        public MyBean bean() {
            return new MyBean();
        }
    
        public static class MyBean {
    
            public void add(Domain object) {
                System.out.println("Adding " + object);
            }
    
            public void remove(Domain object) {
                System.out.println("Removing " + object);
            }
    
            public void update(Domain object) {
                System.out.println("Updating " + object);
            }
    
        }
    
        public static class Domain implements Serializable {
    
            private final Integer type;
    
            private final String info;
    
            public Domain(Integer type, String info) {
                this.type = type;
                this.info = info;
            }
    
            public Integer getType() {
                return this.type;
            }
    
            public String getInfo() {
                return this.info;
            }
    
            @Override
            public String toString() {
                return "Domain [type=" + this.type + ", info=" + this.info + "]";
            }
    
        }
    
    }