Search code examples
javaspring-cloud-streamspring-cloud-stream-binder-kafka

Polled Consumer With Functional Programming & Stream bridge


I'm using spring cloud stream with kafka broker for microservice inter-communication. As part of which, stream bridge will be used to send the message, which is fine.

But during consumption of said message, the message need not be immediately consumed, rather when a condition is satisfied, then only, it should be consumed.

From the documentation, I understand that I need to use Polled Consumers(do correct me if I'm mistaken) for this.

This is what I've tried from what I've understood of the documentation.

application.properties

spring.cloud.stream.pollable-source = consumeResponse
spring.cloud.stream.function.definition = consumeResponse

#stream bridge
spring.cloud.stream.bindings.outputchannel1.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.outputchannel1.binder= kafka1

#polled Consumer
spring.cloud.stream.bindings.consumeResponse-in-0.binder= kafka1
spring.cloud.stream.bindings.consumeResponse-in-0.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.consumeResponse-in-0.group = consumer_cloud_stream1

spring.cloud.stream.binders.kafka1.type=kafka

MainApplication.java

@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
    return args -> {
        //produce message
        for (int i = 0; i < 5; i++) {
            streamBridge.send("outputchannel1", "msg"+i);
            System.out.println("Request :: " + "msg"+i);
        }
    };
}


@Bean
public Consumer<String> consumeResponse() {
    return (response) -> {
        //consume message
        System.out.println("Response :: " + response);
    };
}


@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition) { //some condition that checks whether or not to consume the message
            try {
                //condition satisfied, so forward message to consumer
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload());
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
}

But this throws the following exception:-

Parameter 1 of method poller in com.MainApplication required a single bean, but 2 were found:
    - nullChannel: defined in null
    - errorChannel: defined in null

I'd appreciate it if someone could help me out here or point me towards a working example for the same.

Spring boot version: 2.6.4, Spring cloud version: 2021.0.1


Solution

  • Why don't you just inject the StreamBridge into your runner instead of the message channel?

    By default, stream bridge output channels are created on-demand (first send).