Search code examples
rabbitmqspring-cloudspring-cloud-streamspring-cloud-stream-binder

Producing message for RabbitMq using Supplier


I need to poll messages and publish them to RabbitMQ. There are 100 stores set up, and each store can poll more than 5 messages and publish them to RabbitMQ. Polling occurs every minute. Below is the code for polling and publishing to RabbitMQ.

public class FetchMessages{
@Scheduled(fixedRateString = "60000")
private void sendToClxRmq() {
        //Code to fetch the messages;
        //loop below line for all the 100 stores and messages polled by each of the store
        publishMessage.sendMessages(publishMessageDto);        }
}

public class PublishMessage {

    PublishMessageDto publishMessageDto;
    @Bean
    public Supplier<Message<PublishMessageDto>> routeMessage() {
        return () -> {
            if (ObjectUtils.isNotEmpty(publishMessageDto)) {
                return MessageBuilder.withPayload(publishMessageDto)
                        .setHeader("store", publishMessageDto.getStore())
                        .build();
            } else {
                return null;
            }
        };
    }

    public void sendMessages(final PublishMessageDto publishMessageDto) {
        this.publishMessageDto = publishMessageDto;
    }
}

In the provided code snippet, Supplier bean produces a message whenever its get() method is invoked. The framework provides a default polling mechanism that will trigger the invocation of the supplier and by default it will do so every second. In other words, the above configuration produces a single message every second and each message is sent to an output destination that is exposed by the binder.

Given that the sendToClxRmq() method might produce more than five messages per second for 1 store. Hence invoking the sendMessages() method through sendToClxRmq() would overwrite the value of publishMessageDto in PublishMessage class. Consequently, whenever the get() method of the Supplier bean is called (every second), it will always send the latest message.

How can I address this issue using the Supplier pattern. Should I use StreamBridge or there is any better solution.


Solution

  • The StreamBridge is the best way to implement your requirements with that @Scheduled pattern.

    There is also the way to have a Supplier<Flux<Message<PublishMessageDto>>> in combination with a Sinks.Many, but is it really a necessary overhead for your use-case?

    Well, another solution is to use an internal BlockingQueue instead of just plain (and buggy) PublishMessageDto publishMessageDto property. This way your sendToClxRmq would offer its data to that queue and Spring Sloud Stream would consume that queue in its own pace via Supplier polling. However this solution may lead to the data loss when you produce more than poller can consume and your application stops accidentally.