Search code examples
springspring-cloudspring-kafkaspring-cloud-streamspring-cloud-function

Spring cloud function Function interface return success/failure handling


I currently have a spring cloud stream application that has a listener function that mainly listens to a certain topic and executes the following in sequence:

  1. Consume messages from a topic
  2. Store consumed message in the DB
  3. Call an external service for some information
  4. Process the data
  5. Record the results in DB
  6. Send the message to another topic
  7. Acknowledge the message (I have the acknowledge mode set to manual)

We have decided to move to Spring cloud function, and I have been already able to already do almost all the steps above using the Function interface, with the source topic as input and the sink topic as an output.

@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
    return message -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        return MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
    }
}

My problem goes with exception handling in step 7 (Acknowledge the message). We only acknowledge the message if we are sure that it was sent successfully to the sink queue, otherwise we do no acknowledge the message.

My question is, how can such a thing be implemented within Spring cloud function, specially that the send method is fully dependant on the Spring Framework (as the result of the function interface implementation evaluation).

earlier, we could do this through try/catch

@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
    try {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message);
        
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
    }catch (Exception exception){
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    }
}

Is there a listener that triggers after the Function interface have returned successfully, something like KafkaSendCallback but without specifying a template


Solution

  • Alright, So what I opted in was actually not to use KafkaTemplate (Or streamBridge)for that matter. While it is a feasible solution it would mean that my Function is going to be split into Consumer and some sort of an improvised supplied (the KafkaTemplate in this case).

    As I wanted to adhere to the design goals of the functional interface, I have isolated the behaviour for Database update in a ProducerListener interface implementation

    @Configuration
    public class ProducerListenerConfiguration {
        private final MongoTemplate mongoTemplate;
    
        public ProducerListenerConfiguration(MongoTemplate mongoTemplate) {
            this.mongoTemplate = mongoTemplate;
        }
    
        @Bean
        public ProducerListener myProducerListener() {
            return new ProducerListener() {
                @SneakyThrows
                @Override
                public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                    final ValidatedEvent event = new ObjectMapper().readerFor(ValidatedEvent.class).readValue((byte[]) producerRecord.value());
                    final var updateResult = updateDocumentProcessedState(event.getKey(), event.getPayload().getVersion(), true);
                }
    
                @SneakyThrows
                @Override
                public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
                    ProducerListener.super.onError(producerRecord, recordMetadata, exception);
                }
            };
        }
    
        public UpdateResult updateDocumentProcessedState(String id, long version, boolean isProcessed) {
            Query query = new Query();
            query.addCriteria(Criteria.where("_id").is(id));
            Update update = new Update();
            update.set("processed", isProcessed);
            update.set("version", version);
            return mongoTemplate.updateFirst(query, update, ProductChangedEntity.class);
        }
    }
    

    Then with each successful attempt, the DB is updated with the processing result and the updated version number.