I'm getting an error of Caused by: java.lang.IllegalStateException: No subscriptions have been created
when I use Spring Integration with Project Reactor and I try to figure out how can I subscribe. My original code was:
public IntegrationFlow writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
After it threw the error I've tried to subscribe but I've couldn't understand what should I pass to the subscribe method, that seems to act differently than the regular reactive .subscribe()
public void writeToKafka() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(payload -> {
return new GenericMessage<ConsumerRecord<String, String>>(payload);
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.channel(c -> c.queue("resultChannel"))
.toReactivePublisher().subscribe(value -> {
log.info("Wrote: " + value);
Do that .toReactivePublisher().subscribe()
combination is not correct. The IntegrationFlow
must be first exposed and configured as a bean. And only then, after injecting this bean somewhere in your service you can subscribe()
to that Publisher
You are missing the fact that inversion of control has to be initialized first in its dependency injection container and only after that we can do some real work (subscribe) with those beans.
For example my test-case:
public class ReactiveStreamsTests {
private Publisher<Message<Integer>> pollablePublisher;
private AbstractEndpoint reactiveTransformer;
private MessageChannel inputChannel;
void testPollableReactiveFlow() throws Exception {
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
CountDownLatch latch = new CountDownLatch(6);
.filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.doOnNext(p -> latch.countDown())
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<List<Integer>> future =
exec.submit(() ->
.map(v -> v.split(","))
this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
List<Integer> integers = future.get(20, TimeUnit.SECONDS);
public static class ContextConfiguration {
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt,
e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))