I'm using spring cloud stream with kafka broker for microservice inter-communication. As part of which, stream bridge
will be used to send the message, which is fine.
But during consumption of said message, the message need not be immediately consumed, rather when a condition is satisfied, then only, it should be consumed.
From the documentation, I understand that I need to use Polled Consumers
(do correct me if I'm mistaken) for this.
This is what I've tried from what I've understood of the documentation.
application.properties
spring.cloud.stream.pollable-source = consumeResponse
spring.cloud.stream.function.definition = consumeResponse
#stream bridge
spring.cloud.stream.bindings.outputchannel1.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.outputchannel1.binder= kafka1
#polled Consumer
spring.cloud.stream.bindings.consumeResponse-in-0.binder= kafka1
spring.cloud.stream.bindings.consumeResponse-in-0.destination = REQUEST_TOPIC
spring.cloud.stream.bindings.consumeResponse-in-0.group = consumer_cloud_stream1
spring.cloud.stream.binders.kafka1.type=kafka
MainApplication.java
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
return args -> {
//produce message
for (int i = 0; i < 5; i++) {
streamBridge.send("outputchannel1", "msg"+i);
System.out.println("Request :: " + "msg"+i);
}
};
}
@Bean
public Consumer<String> consumeResponse() {
return (response) -> {
//consume message
System.out.println("Response :: " + response);
};
}
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition) { //some condition that checks whether or not to consume the message
try {
//condition satisfied, so forward message to consumer
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload());
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
But this throws the following exception:-
Parameter 1 of method poller in com.MainApplication required a single bean, but 2 were found:
- nullChannel: defined in null
- errorChannel: defined in null
I'd appreciate it if someone could help me out here or point me towards a working example for the same.
Spring boot version: 2.6.4, Spring cloud version: 2021.0.1
Why don't you just inject the StreamBridge
into your runner instead of the message channel?
By default, stream bridge output channels are created on-demand (first send).