Search code examples
javaspringspring-bootspring-integrationproject-reactor

Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Project Reactor


I'm getting an error of Caused by: java.lang.IllegalStateException: No subscriptions have been created when I use Spring Integration with Project Reactor and I try to figure out how can I subscribe. My original code was:

    @Bean
    public IntegrationFlow writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .get();
    }

After it threw the error I've tried to subscribe but I've couldn't understand what should I pass to the subscribe method, that seems to act differently than the regular reactive .subscribe().

    @Bean
    public void writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .toReactivePublisher().subscribe(value -> {
                log.info("Wrote: " + value);
            });
    }


Solution

  • Do that .toReactivePublisher().subscribe() combination is not correct. The IntegrationFlow must be first exposed and configured as a bean. And only then, after injecting this bean somewhere in your service you can subscribe() to that Publisher bean.

    You are missing the fact that inversion of control has to be initialized first in its dependency injection container and only after that we can do some real work (subscribe) with those beans.

    EDIT

    For example my test-case:

    @SpringJUnitConfig
    @DirtiesContext
    public class ReactiveStreamsTests {
    
        @Autowired
        @Qualifier("pollableReactiveFlow")
        private Publisher<Message<Integer>> pollablePublisher;
    
        @Autowired
        private AbstractEndpoint reactiveTransformer;
    
        @Autowired
        @Qualifier("inputChannel")
        private MessageChannel inputChannel;
    
        @Test
        void testPollableReactiveFlow() throws Exception {
            assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
            this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
    
            CountDownLatch latch = new CountDownLatch(6);
    
            Flux.from(this.pollablePublisher)
                    .take(6)
                    .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                    .doOnNext(p -> latch.countDown())
                    .subscribe();
    
            ExecutorService exec = Executors.newSingleThreadExecutor();
            Future<List<Integer>> future =
                    exec.submit(() ->
                            Flux.just("11,12,13")
                                    .map(v -> v.split(","))
                                    .flatMapIterable(Arrays::asList)
                                    .map(Integer::parseInt)
                                    .<Message<Integer>>map(GenericMessage::new)
                                    .concatWith(this.pollablePublisher)
                                    .take(7)
                                    .map(Message::getPayload)
                                    .collectList()
                                    .block(Duration.ofSeconds(10))
                    );
    
            this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));
    
            assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
            List<Integer> integers = future.get(20, TimeUnit.SECONDS);
    
            assertThat(integers).isNotNull();
            assertThat(integers.size()).isEqualTo(7);
            exec.shutdownNow();
        }
    
        @Configuration
        @EnableIntegration
        public static class ContextConfiguration {
    
            @Bean
            public Publisher<Message<Integer>> pollableReactiveFlow() {
                return IntegrationFlows
                        .from("inputChannel")
                        .split(s -> s.delimiters(","))
                        .<String, Integer>transform(Integer::parseInt,
                                e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
                        .channel(MessageChannels.queue())
                        .log()
                        .toReactivePublisher();
            }
    
        }
    
    }