Search code examples
javaapache-kafkaspring-webfluxspring-kafka

Spring webFlux No subscriptions have been created for Kafka Streams


i'm trying to publish message to a Kafka topic using spring web-flux, these are the configurations

@Bean
    public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<String, String>(SenderOptions.create(props));



    }


@Bean
    public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplateTemplate(
            KafkaProperties properties) {
        Map<String, Object> props = properties.buildConsumerProperties();
        return new ReactiveKafkaConsumerTemplate<>(ReceiverOptions.create(props));
    }

Then i try to stream all the values present in the topic

 @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> postToKafka() {
    return consumerTemplate.receiveAtMostOnce().map(rec -> rec.value()).doOnNext(System.out::println);


}

but instead of getting data present in the topics i'm getting these error

java.lang.IllegalStateException: No subscriptions have been created at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:385) ~[reactor-kafka-1.3.3.jar:1.3.3] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler


Solution

  • You don't provide topics to consume from. See these options of the ReceiverOptions:

    /**
     * Sets subscription using manual assignment to the specified partitions.
     * This assignment is enabled when the receive Flux of a {@link KafkaReceiver} using this
     * options instance is subscribed to. Any existing subscriptions or assignments on this
     * option are deleted.
     * @return options instance with new partition assignment
     */
    @NonNull
    ReceiverOptions<K, V> assignment(Collection<TopicPartition> partitions);
    
    /**
     * Sets subscription using group management to the specified collection of topics.
     * This subscription is enabled when the receive Flux of a {@link KafkaReceiver} using this
     * options instance is subscribed to. Any existing subscriptions or assignments on this
     * option are deleted.
     * @return options instance with new subscription
     */
    @NonNull
    ReceiverOptions<K, V> subscription(Collection<String> topics);
    
    /**
     * Sets subscription using group management to the specified pattern.
     * This subscription is enabled when the receive Flux of a {@link KafkaReceiver} using this
     * options instance is subscribed to. Any existing subscriptions or assignments on this
     * option are deleted. Topics are dynamically assigned or removed when topics
     * matching the pattern are created or deleted.
     * @return options instance with new subscription
     */
    @NonNull
    ReceiverOptions<K, V> subscription(Pattern pattern);