Below is the consumer code to receive messages from kafka topic (8 partition) and processing it.
@Component
public class MessageConsumer {
private static final String TOPIC = "mytopic.t";
private static final String GROUP_ID = "mygroup";
private final ReceiverOptions consumerSettings;
private static final Logger LOG = LoggerFactory.getLogger(MessageConsumer.class);
@Autowired
public MessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings)
{
this.consumerSettings=consumerSettings;
consumerMessage();
}
private void consumerMessage()
{
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));
Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);
Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(scheduler)
.concatMap(m -> {
LOG.info("message received from kafka : " + "key : " + m.key()+ " partition: " + m.partition());
return process(m.key(), m.value())
.thenEmpty(m.receiverOffset().commit());
}))
.retryBackoff(5, Duration.ofSeconds(2), Duration.ofHours(2))
.doOnError(err -> {
handleError(err);
}).retry()
.doOnCancel(() -> close()).subscribe();
}
private void close() {
}
private void handleError(Throwable err) {
LOG.error("kafka stream error : ",err);
}
private Mono<Void> process(String key, String value)
{
if(key.equals("error"))
return Mono.error(new Exception("process error : "));
LOG.error("message consumed : "+key);
return Mono.empty();
}
public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
return consumerSettings
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.subscription(topics);
}
}
@Bean(name="consumerSettings")
public ReceiverOptions<String, String> getConsumerSettings() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put("max.block.ms", "3000");
props.put("request.timeout.ms", "3000");
return ReceiverOptions.create(props);
}
On receiving each message, my processing logic returns on empty mono if the consumed message processed successfully.
Everything works as expected if there is no error returned in the processing logic.
But if i throw an error to simulate the exception behaviour in my processing logic for a particular message then i am missing to process that message which caused the exception. The stream moves to the next message.
What i want to achieve is, process the current message and commit the offset if its successful then move to the next record.
If any exception in processing the message don't commit the current offset and retry the same message until its successful. Don't move to the next message until the current message is successful.
Please let me know how to handle process failures without skipping the message and make the stream start from the offset where the exception is thrown.
Regards,
Vinoth
The below code works for me. The idea is to retry the failed messages configured number of time and if its still fails then move it to failed queue and commit the message. At the same time process the messages from other partitions concurrently.
If a message from a particular partition fails configured number of time then restart the stream after a delay so that we can handle dependency failures by not hitting them continuously.
@Autowired
public ReactiveMessageConsumer(@Qualifier("consumerSettings") ReceiverOptions consumerSettings,MessageProducer producer)
{
this.consumerSettings=consumerSettings;
this.fraudCheckService=fraudCheckService;
this.producer=producer;
consumerMessage();
}
private void consumerMessage() {
int numRetries=3;
Scheduler scheduler = Schedulers.newElastic("FLUX_DEFER", 10, true);
KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions(Collections.singleton(TOPIC)));
Flux<GroupedFlux<TopicPartition, ReceiverRecord<String, String>>> f = Flux.defer(receiver::receive)
.groupBy(m -> m.receiverOffset().topicPartition());
Flux f1 = f.publishOn(scheduler).flatMap(r -> r.publishOn(scheduler).concatMap(b ->
Flux.just(b)
.concatMap(a -> {
LOG.error("processing message - order: {} offset: {} partition: {}",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition());
return process(a.key(), a.value()).
then(a.receiverOffset().commit())
.doOnSuccess(d -> LOG.info("committing order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()))
.doOnError(d -> LOG.info("committing offset failed for order {}: offset: {} partition: {} ",a.key(),a.receiverOffset().offset(),a.receiverOffset().topicPartition().partition()));
})
.retryWhen(companion -> companion
.doOnNext(s -> LOG.info(" --> Exception processing message for order {}: offset: {} partition: {} message: {} " , b.key() , b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition(),s.getMessage()))
.zipWith(Flux.range(1, numRetries), (error, index) -> {
if (index < numRetries) {
LOG.info(" --> Retying {} order: {} offset: {} partition: {} ", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
return index;
} else {
LOG.info(" --> Retries Exhausted: {} - order: {} offset: {} partition: {}. Message moved to error queue. Commit and proceed to next", index, b.key(),b.receiverOffset().offset(),b.receiverOffset().topicPartition().partition());
producer.sendMessages(ERROR_TOPIC,b.key(),b.value());
b.receiverOffset().commit();
//return index;
throw Exceptions.propagate(error);
}
})
.flatMap(index -> Mono.delay(Duration.ofSeconds((long) Math.pow(1.5, index - 1) * 3)))
.doOnNext(s -> LOG.info(" --> Retried at: {} ", LocalTime.now()))
))
);
f1.doOnError(a -> {
LOG.info("Moving to next message because of : ", a);
try {
Thread.sleep(5000); // configurable
} catch (InterruptedException e) {
e.printStackTrace();
}
}
).retry().subscribe();
}
public ReceiverOptions<String, String> receiverOptions(Collection<String> topics) {
return consumerSettings
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.addAssignListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.addRevokeListener(p -> LOG.info("Group {} partitions assigned {}", GROUP_ID, p))
.subscription(topics);
}
private Mono<Void> process(OrderId orderId, TraceId traceId)
{
try {
Thread.sleep(500); // simulate slow response
} catch (InterruptedException e) {
// Causes the restart
e.printStackTrace();
}
if(orderId.getId().startsWith("error")) // simulate error scenario
return Mono.error(new Exception("processing message failed for order: " + orderId.getId()));
return Mono.empty();
}