Ok here is the goal: I have a service which is supposed to send mails, if this fails, my Kafka producer will send this mail to a Kafka-topic. A second program looks every two minutes at the topic and should consume only one message (the oldest one) and retry to send it, if it fails, the program should take this message back to the topic.
I have already a consumer, but the problem is, it consumes all messages that I haven’t consumed until now with consumer. But I want him to only consume the oldest one, he hasn’t consumed before.
Here is my actual consumer:
"customMessage" is a class created by me for testing, it's only an object with diffrent atributes like date, a message and other stuff.
For testing the code is a little bit modiffyed, so this consumer is running endles. It also only displays the message instead of processing it, so there is only “System.out.println(message.displayMessage());
“.
public class ConsumerOne {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "happyConsumer");
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("mymailtopic"));
while (true) {
ObjectMapper om = new ObjectMapper();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
customMessage message=null;
try {
message = om.readValue(record.value(), customMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(message.displayMessage());
}
}
}
}
With the actual code the service works, too, but I have the provision to at least try if it could work also like I had mention in the first paragraph. So, my questions are:
I’m using Java & quarkus for the entire service (& the latest version of Kafka I think).
You can consume message by message using max.poll.records property, set this property to 1
in config
The maximum number of records returned in a single call to poll().
properties.put("max.poll.records", 1);