i'm trying to publish message to a Kafka topic using spring web-flux, these are the configurations
@Bean
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, String>(SenderOptions.create(props));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplateTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
return new ReactiveKafkaConsumerTemplate<>(ReceiverOptions.create(props));
}
Then i try to stream all the values present in the topic
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> postToKafka() {
return consumerTemplate.receiveAtMostOnce().map(rec -> rec.value()).doOnNext(System.out::println);
}
but instead of getting data present in the topics i'm getting these error
java.lang.IllegalStateException: No subscriptions have been created at reactor.kafka.receiver.ReceiverOptions.subscriber(ReceiverOptions.java:385) ~[reactor-kafka-1.3.3.jar:1.3.3] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ Handler
You don't provide topics to consume from.
See these options of the ReceiverOptions
:
/**
* Sets subscription using manual assignment to the specified partitions.
* This assignment is enabled when the receive Flux of a {@link KafkaReceiver} using this
* options instance is subscribed to. Any existing subscriptions or assignments on this
* option are deleted.
* @return options instance with new partition assignment
*/
@NonNull
ReceiverOptions<K, V> assignment(Collection<TopicPartition> partitions);
/**
* Sets subscription using group management to the specified collection of topics.
* This subscription is enabled when the receive Flux of a {@link KafkaReceiver} using this
* options instance is subscribed to. Any existing subscriptions or assignments on this
* option are deleted.
* @return options instance with new subscription
*/
@NonNull
ReceiverOptions<K, V> subscription(Collection<String> topics);
/**
* Sets subscription using group management to the specified pattern.
* This subscription is enabled when the receive Flux of a {@link KafkaReceiver} using this
* options instance is subscribed to. Any existing subscriptions or assignments on this
* option are deleted. Topics are dynamically assigned or removed when topics
* matching the pattern are created or deleted.
* @return options instance with new subscription
*/
@NonNull
ReceiverOptions<K, V> subscription(Pattern pattern);