Search code examples
javaspringspring-integrationspring-annotationsspring-messaging

PollableChannel with Spring Integration


I have an interface Channels.java

    final String OUTPUT = "output";

    final String INPUT = "input";


    @Output(OUTPUT)
    MessageChannel output();

    @BridgeFrom(OUTPUT)
    PollableChannel input();

I have another class where i perform all the messaging operations:

@Autowired
@Qualifier(Channels.OUTPUT)
private MessageChannel Output;

I am able to send messages to the exchanges fine. How to I use my PollableChannel here? What am I doing wrong?

EDIT

And how to I access the bean inside my @Component class?

I now have @Configuration class with

@Bean
@BridgeTo(Channels.OUTPUT)
public PollableChannel polled() {
    return new QueueChannel();
}

Want to be able to use this channel to receive messages?


Solution

  • The bridge has to be a @Bean not an annotation on an interface method - see the answer to your general question here.

    EDIT

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class So44018382Application implements CommandLineRunner {
    
        final Logger logger = LoggerFactory.getLogger(getClass());
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = SpringApplication.run(So44018382Application.class, args);
            Thread.sleep(60_000);
            context.close();
        }
    
        @RabbitListener(bindings =
                @QueueBinding(value = @Queue(value = "foo", autoDelete = "true"),
                                exchange = @Exchange(value = "output", type = "topic"), key = "#"))
        // bind a queue to the output exchange
        public void listen(String in) {
            this.logger.info("received " + in);
        }
    
        @BridgeTo(value = Source.OUTPUT,
                poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "2"))
        @Bean
        public PollableChannel polled() {
            return new QueueChannel(5);
        }
    
        @Override
        public void run(String... args) throws Exception {
            for (int i = 0; i < 30; i++) {
                polled().send(new GenericMessage<>("foo" + i));
                this.logger.info("sent foo" + i);
            }
        }
    
    }
    

    This works fine for me; the queue has a depth of 5; when it is full, the sender blocks; the poller only removes 2 messages at a time and sends them to the output channel.

    This example also adds a rabbit listener to consume the messages sent to the binder.