Search code examples
javaspringrabbitmqspring-integrationspring-amqp

Group received messages in RabbitMQ, preferably using Spring AMQP?


I'm receiving messages from a service (S) that publishes each individual property change to an entity as a separate message. A contrived example would be an entity like this:

Person {
    id: 123
    name: "Something",
    address: {...}
}

If name and address are updated in the same transaction then (S) will publish two messages, PersonNameCorrected and PersonMoved. The problem is on the receiving side where I'm storing a projection of this Person entity and each property change causes a write to the database. So in this example there would be two writes to the database but if I could batch messages for a short period of time and group them by id then I would only have to make a single write to the database.

How does one typically handle this in RabbitMQ? Does Spring AMQP provide an easier abstraction?

Note that I have looked briefly at prefetch but I'm not sure if this is the way to go. Also prefetch, if I understand it correctly, is per connection basis. I'm trying to achieve this on a per-queue basis, because if batching (and thus added latency) is the way to go I wouldn't like to add this latency to ALL queues consumed by my service (but only to those that need the "group-by-id" features).


Solution

  • Prefetch won't help for a case like this.

    Consider using Spring Integration which has adapters that sit on top of Spring AMQP; it also provides an aggregrator which can be used to group messages together before sending them on to the next stage in the pipeline.

    EDIT

    Here's a quick boot app to demostrate...

    @SpringBootApplication
    public class So42969130Application implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(So42969130Application.class, args)
                .close();
        }
    
        @Autowired
        private RabbitTemplate template;
    
        @Autowired
        private Handler handler;
    
        @Override
        public void run(String... args) throws Exception {
            this.template.convertAndSend("so9130", new PersonNameChanged(123));
            this.template.convertAndSend("so9130", new PersonMoved(123));
            this.handler.latch.await(10, TimeUnit.SECONDS);
        }
    
        @Bean
        public IntegrationFlow flow(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130")
                            .messageConverter(converter()))
                    .aggregate(a -> a
                            .correlationExpression("payload.id")
                            .releaseExpression("false") // open-ended release, timeout only
                            .sendPartialResultOnExpiry(true)
                            .groupTimeout(2000))
                    .handle(handler())
                    .get();
        }
    
        @Bean
        public Jackson2JsonMessageConverter converter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public Handler handler() {
            return new Handler();
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so9130", false, false, true);
        }
    
        public static class Handler {
    
            private final CountDownLatch latch = new CountDownLatch(1);
    
            @ServiceActivator
            public void handle(Collection<?> aggregatedData) {
                System.out.println(aggregatedData);
                this.latch.countDown();
            }
    
        }
    
        public static class PersonNameChanged {
    
            private int id;
    
            PersonNameChanged() {
            }
    
            PersonNameChanged(int id) {
                this.id = id;
            }
    
            public int getId() {
                return this.id;
            }
    
            public void setId(int id) {
                this.id = id;
            }
    
            @Override
            public String toString() {
                return "PersonNameChanged [id=" + this.id + "]";
            }
    
        }
    
        public static class PersonMoved {
    
            private int id;
    
            PersonMoved() {
            }
    
            PersonMoved(int id) {
                this.id = id;
            }
    
            public int getId() {
                return this.id;
            }
    
            public void setId(int id) {
                this.id = id;
            }
    
            @Override
            public String toString() {
                return "PersonMoved [id=" + this.id + "]";
            }
    
        }
    
    }
    

    Pom:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>so42969130</artifactId>
        <version>2.0.0-BUILD-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>so42969130</name>
        <description>Demo project for Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-integration</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-java-dsl</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    

    Result:

    2017-03-23 09:56:57.501  INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
        Expiring MessageGroup with correlationKey[123]
    [PersonNameChanged [id=123], PersonMoved [id=123]]