Search code examples
javaapache-kafkaspring-kafkakafka-producer-api

Get Objects value in Onfailure() method call of Kafka Send() method


I want to get those Person objects which were not sent to Kafka i.e onFailure() method.

1)I created temp array of person type and passed it to the onFailure() .But this doesnt work.It always shows the same temp[0] doest work.It always print the last value in the List. 2)I tried to create local Person object to solve the issue .But i got compilation error

"Local variable pp defined in an enclosing scope must be final or effectively final"

How can i resolve this issue ?

class Sender {

        @Autowired
        private KafkaTemplate<String, Person> template;

        private static final Logger LOG = LoggerFactory.getLogger(Sender.class);



        Person temp= null;

        // @Transactional("ktm")
        public void sendThem(List<Person> toSend) throws InterruptedException {
            List<ListenableFuture<SendResult<String, Person>>> futures = new ArrayList<>();


            ListenableFutureCallback<SendResult<String, Person>> callback = new ListenableFutureCallback<SendResult<String, Person>>() {

                @Override
                public void onSuccess(SendResult<String, Person> result) {
                    //LOG.info(" message success 1: " + result.getProducerRecord().value());
                    LOG.info(" message success 2: " + temp);
                }

                @Override
                public void onFailure(Throwable ex) {
                    LOG.info(" message failed : "  + temp);

                }
            };

            for (Person p : toSend) {

                temp=p;
                ListenableFuture<SendResult<String, Person>> future = template.send("t_101", temp);


                future.addCallback(callback);
            }



        }
    }

Solution

  • The failed send value is available in the Throwable - cast it to KafkaProducerException. Your Person is in producerRecord.value().

    I updated my answer to your other question.

    You can also create a new callback in the loop and then p will be available to you; but it's more efficient to use a single callback and get the Person from the exception.