Search code examples
spring-bootapache-kafkaspring-kafkakafka-producer-api

KAFKA : splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE


I am sending 10 messages . 2 messagesare "right" and 1 message has size over 1MB which gets rejected by Kafka broker due to RecordTooLargeException.

I have 2 doubts 1) MESSAGE_TOO_LARGE appears only when the Scheduler calls the method second time onwards.When the method is called for the first time by scheduler splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE doesnt appear. 2)Why retries are not getting decreased.I have given retry=1 .

I am calling Sender class using Spring Boot Scheduling mechanism.Something like this

@Scheduled(fixedDelay = 30000)
    public void process() {

        sender.sendThem();

    }

I am using Spring Boot KafkaTemplate.

@Configuration
@EnableKafka
public class KakfaConfiguration {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();

        // props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        // props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
        // appProps.getJksLocation());
        // props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
        // appProps.getJksPassword());
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, acks);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackOffMsConfig);
        config.put(ProducerConfig.RETRIES_CONFIG, retries);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-99");

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "ktm")
    public KafkaTransactionManager kafkaTransactionManager() {
        KafkaTransactionManager ktm = new KafkaTransactionManager(producerFactory());
        ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    }

}
@Component
@EnableTransactionManagement
class Sender {

    @Autowired
    private KafkaTemplate<String, String> template;

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Transactional("ktm")
    public void sendThem(List<String> toSend) throws InterruptedException {
        List<ListenableFuture<SendResult<String, String>>> futures = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(toSend.size());
        ListenableFutureCallback<SendResult<String, String>> callback = new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info(" message sucess : " + result.getProducerRecord().value());
                latch.countDown();
            }

            @Override
            public void onFailure(Throwable ex) {
                LOG.error("Message Failed ");
                latch.countDown();
            }
        };

        toSend.forEach(str -> {
            ListenableFuture<SendResult<String, String>> future = template.send("t_101", str);
            future.addCallback(callback);
        });

        if (latch.await(12, TimeUnit.MINUTES)) {
            LOG.info("All sent ok");
        } else {
            for (int i = 0; i < toSend.size(); i++) {
                if (!futures.get(i).isDone()) {
                    LOG.error("No send result for " + toSend.get(i));
                }
            }
        }

I am getting the following logs

2020-05-01 15:55:18.346  INFO 6476 --- [   scheduling-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1588328718345
2020-05-01 15:55:18.347  INFO 6476 --- [   scheduling-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to -1 with epoch -1
2020-05-01 15:55:18.351  INFO 6476 --- [oducer-prod-991] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-prod-991, transactionalId=prod-991] Cluster ID: bL-uhcXlRSWGaOaSeDpIog
2020-05-01 15:55:48.358  INFO 6476 --- [oducer-prod-991] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-prod-991, transactionalId=prod-991] ProducerId set to 13000 with epoch 10
 Value of kafka template----- 1518752790
2020-05-01 15:55:48.377  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 8 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT0
2020-05-01 15:55:48.379  INFO 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  :  message sucess : TTTT1
2020-05-01 15:55:48.511 ERROR 6476 --- [oducer-prod-991] com.a.kafkaproducer.producer.Sender  : Message Failed 
2020-05-01 15:55:48.512 ERROR 6476 --- [oducer-prod-991] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='

2020-05-01 15:55:48.514  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 10 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.518  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 11 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.523  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 12 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.527  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 13 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.531  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 14 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.534  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 15 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.538  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 16 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.542  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 17 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE
2020-05-01 15:55:48.546  WARN 6476 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Got error produce response in correlation id 18 on topic-partition t_101-2, splitting and retrying (1 attempts left). Error: MESSAGE_TOO_LARGE

Then after sometime the program completes with following log

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for t_101-0:120000 ms has passed since batch creation

2020-05-01 16:18:31.322  WARN 17816 --- [   scheduling-1] o.s.k.core.DefaultKafkaProducerFactory   : Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@7085a4dd, txId=prod-991]
2020-05-01 16:18:31.322  INFO 17816 --- [   scheduling-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-prod-991, transactionalId=prod-991] Closing the Kafka producer with timeoutMillis = 5000 ms.
2020-05-01 16:18:31.324  INFO 17816 --- [oducer-prod-991] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-prod-991, transactionalId=prod-991] Aborting incomplete transaction due to shutdown
 error messahe here
------ processing done in parent class------

Solution

  • A broad picture of producer workflow is given below.

    enter image description here

    By setting RETRIES_CONFIG property, we can guarantee that in case of failure this producer will try sending that message.

    If the batch is too large, we split the batch and send the split batches again. We do not decrement the retry attempts in this case.

    You can go through the source code given below and find the scenarios in which retry count is decremented.

    https://github.com/apache/kafka/blob/68ac551966e2be5b13adb2f703a01211e6f7a34b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java