Search code examples
javaapache-kafkakafka-producer-api

I send message by kafka-producer in multi-threads, but message loss occurred


I'm using kafka-producer and sending data to topic 'test-topic' that consist of replication-factor 3 and partitions 1 in kafka-cluster(consisting of three brokers).

I created five thread. each thread sent 10,000 messages(each message size is 4000 bytes).

i expected latest offset 50,000 but actually 44,993.

about 5,000 messages loss occurred.

Why did message loss occur? Below my codes... (KAFKA-VERSION 1.1.0)

KafkaMessageSender.class

public class KafkaMessageSender {
    private final static Logger logger = 
 LoggerFactory.getLogger(KafkaMessageSender.class);
    private Properties props;
    private KafkaProducer<String, String> producer;
    private String topic;
    private AtomicInteger count;

    public KafkaMessageSender(AtomicInteger count, String bootstrapUrls, String topic) {
        logger.info("KafkaMessageSender initializing...");
        this.topic = topic;
        this.count = count;
        props = new Properties();
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //16384
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
        logger.info("KafkaMessageSender initializing end");     

    }

    public void sendMessages() {
        producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K)); //Messages.MSG_4K indicates 4000bytes message
        count.getAndIncrement();
        logger.info("count : "+count.get());
    }
}

KafkaMessageSenderMain.class

public class KafkaMessageSenderMain {

    private final static Logger logger = LoggerFactory.getLogger(KafkaMessageSenderMain.class);

    final static String bootstrap_url = "ism1.solulink.co.kr:9092,ism2.solulink.co.kr:9092,ism3.solulink.co.kr:9092";
    final static String topic = "test-topic"; //topic name
    final static AtomicInteger count = new AtomicInteger(0);
    final static int MAX_LOOP = 10000; //message sending count
    final static int MAX_THREAD = 5;  //created number of threads

    public static void main(String[] args) {        
        long startTime = System.currentTimeMillis();
        ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD);
        for(int i = 0; i < MAX_THREAD; i++) {
            executorService.execute(() ->{
                    KafkaMessageSender sender = new KafkaMessageSender(count, bootstrap_url, topic);
                    for(int j = 0; j < MAX_LOOP; j++) {
                        sender.sendMessages(); //send message
                    }
            });
        }

        executorService.shutdown();
        try {
            boolean flag = executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); 
            long endTime = System.currentTimeMillis();
            long procTime = (endTime - startTime);
            logger.info("all Threads is shutdown? : "+flag);
            logger.info("processTime : " + ((double)procTime/(double)1000L)+"sec");
        } catch (InterruptedException e) {
            logger.error("awaitTermination exception",e);
        }
    }
}

Result

Result image


Solution

  • Can you modify and run your code as below to see what the error is ?

    producer.send(new ProducerRecord<String, String>(topic, Messages.MSG_4K), new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
            }
    });