Search code examples
spring-cloudspring-cloud-streamspring-cloud-dataflow

Aggregate Messages in Spring Cloud Stream


I am new to spring cloud and looking to change our mono structure to micro services , this said at first what i am trying to do now is the following

  1. Receive Requests to invoke a web service (External System) from different sources. At any certain time this can be 1 request or up to 100K request.
  2. The external system support bulk , so it is better if i can aggregate the messages and send them in bulk . For example keep aggregating until either number threshold is reached (100 message) or time threshold 2 second is reached.
  3. Also if i received an error i want to back off exponentially

My first idea is to create a Processor before my Sink that do the above aggregation .

Is this the correct way of thinking in cloud computing or their is another path to go through ?


Working Solution

@EnableBinding(Processor.class)
class Configuration {

    @Autowired
    Processor processor;


    @ServiceActivator(inputChannel = Processor.INPUT)
    @Bean
    public MessageHandler aggregator() {

        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));

        //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }
}

Solution

  • You can write an aggregator processor application that combines multiple messages into a single message. For more info on the Spring Integration aggregator, please refer here