Search code examples
spring-integrationspring-kafkaspring-cloud-streamspring-cloud-dataflow

How to pause the Spring cloud data flow Source class from sending data to kafka?


i am working on spring cloud data flow application ,Following is the code snippet

    @Bean
    @InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {

        return new MessageSource<Product>() {
            @SneakyThrows
            @Override
            public Message<Product> receive() {
                System.out.println("calling method");
                return MessageBuilder.withPayload(dataAccess.getNext()).build();
            }
        };
    }

In above code the getNext() method will get the data from the database and return that object,so if the data is completely readed then it will return null

we can't return null to this MessageSource.

so is there any options available to pause and resume this Source connection class whenever we need?

Did any one faced / overcome this scenario?


Solution

  • First of all you just can have a Supplier<Product> instead of that MessageSourceand your code would be just like this:

    return () -> dataAccess.getNext();
    

    The null result is valid over here and no message is going to be emitted in this case and no error since the framework handles null result properly.

    You still can have an idle functionality on that @InboundChannelAdapter when result of the method call is null. For that reason you need to take a look into the SimpleActiveIdleMessageSourceAdvice. See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice