Search code examples
spring-cloud-streamspring-cloud-dataflow

EnableBinding and Source.class replacement


I have below code that works with spring cloud stream 3. Now I upgraded to spring cloud stream 4. EnableBinding and Source are both removed. I have read many posts regarding the deprecation of these and I know I should probably use a functional method like Supplier to replace Source. But I still don't know how to use Supplier in this example since rabbitmq container will keep running and output() will be called whenever a message is received whereas Supplier is just to define a single method and return a single value. Can anyone provide a complete code in new way not just some snippets or generic documentations?

@EnableBinding(Source.class)
public class App {
    
    @Autowired
    private Source source;

    @EventListener(ApplicationReadyEvent.class)
    public void start(ApplicationReadyEvent e) {
        
        SimpleMessageListenerContainer rabbitContainer = new SimpleMessageListenerContainer(connectionFactory);
        rabbitContainer.setMessageListener((message) -> {
            source.output().send(MessageBuilder.withPayload(message.getBody()).build(), 5000);
        })
        rabbitContainer.start();
    }
}

My 2nd attempt.

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@Component
public class App {
    
    @Autowired
    private StreamBridge streamBridge;

    @EventListener(ApplicationReadyEvent.class)
    public void start(ApplicationReadyEvent e) {
        
        SimpleMessageListenerContainer rabbitContainer = new SimpleMessageListenerContainer(connectionFactory);
        rabbitContainer.setMessageListener((message) -> {
            streamBridge.send("output", MessageBuilder.withPayload(message.getBody()).build());
        })
        rabbitContainer.start();
    }
}


Solution

  • See the documentation.

    Use a StreamBridge to send arbitrary output to a binding.

    That said, this is not good practice...

        @EventListener(ApplicationReadyEvent.class)
        public void start(ApplicationReadyEvent e) {
            
            SimpleMessageListenerContainer rabbitContainer = new SimpleMessageListenerContainer(connectionFactory);
            rabbitContainer.setMessageListener((message) -> {
                source.output().send(MessageBuilder.withPayload(message.getBody()).build(), 5000);
            })
            rabbitContainer.start();
        }
    

    The container needs to be managed by Spring (i.e. a Spring Bean) to get all its functionality (e.g. event publication).

    You can define the container as a @Bean; it will be started during the context refresh; it is not necessary to start it manually. If you still want to do so, set its autoStartup property to false and start it whenever you want.