I want to use this extension: [Quarkus Smallrye Reactive Messaging Kafka]
But in my application the name of the topics is not known in advance, it is specified according to the message received from the user at runtime. How can I specify the topic name and settings related to the topic without annotations and programmatically? (Only for send a message to Kafka -> Produce)
@ApplicationScoped
public class PriceGenerator {
private Random random = new Random();
// Don't want to use this
// "generated-price" not known at build time
@Outgoing("generated-price")
public Multi<Integer> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
.onOverflow().drop()
.map(tick -> random.nextInt(100));
}
}
or these configs should set at runtime and programmatically
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
Because I did not know the way, I used the native Kafka driver
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topicName.toString(), messageFactory.MessageToString(message)));
you can use emitter, with metadata the code will look like this
@Channel("channel-out")
Emitter<String> kafkaEventEmitter;
public void publishToKafka(String data,String TOPICNAME) {
OutgoingKafkaRecordMetadata<?> metadata= OutgoingKafkaRecordMetadata.builder()
.withTopic(TOPICNAME)
.build();
kafkaEventEmitter.send(Message.of(data).addMetadata(metadata));
}
and here is official documentation :https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.9/kafka/kafka.html#_dynamic_topic_names