Search code examples
javaapache-kafkaspring-xd

kafka consumer get a message which was consumed before


I have a consumer job in XD which will be completed once it receives a message which is produced by other producer job. And I trigger these jobs every day. I found sometimes this consumer got a message which was consumed before.

Logs as the following:

####OK
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1561658772877, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561658700108, timestamp is 1561658772877 ==================

####NG
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561799100282, timestamp is 1561399136840 ==================

####OK
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 2, offset = 5, CreateTime = 1561817817702, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1561817528447, timestamp is 1561817817702 ==================

####NG
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is  message_from_producer, task startTime is 1562004300372, timestamp is 1561399136840 ==================

Looks like it got offset = 0 message multiple times.

Kakfa version(1.0.0)

Consumer commits offset manually.(consumer.commitSync();)
Only set up the following properties:

bootstrap.servers  
auto.offset.reset=earliest  
group.id  
client.id  
    Properties config = new Properties();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put("auto.offset.reset", "earliest");
    config.put("group.id", group);
    config.put("client.id", config.getProperty("group.id") + "_" + System.currentTimeMillis());
    config.put("enable.auto.commit", false);
    try {
        consumer = new KafkaConsumer<>(config);
        consumer.subscribe(tList);
        while (true) {
            ConsumerRecords<?, ?> records = consumer.poll(10000);
            for (ConsumerRecord<?, ?> record : records) {
                //.........
                consumer.commitSync();
            }
            if (matched)
                break;
        }
    } finally {
        consumer.close();
    }

Solution

  • In Kafka 1.1, by default offsets are only kept for 24 hours as offsets.retention.minutes is set to 1440.

    So if you stop your consumer for over 24 hours, upon restarting there is a chance the committed offsets will have been deleted forcing the consumer to use auto.offset.reset to find a new position.

    As this was too short for many people, from Kafka 2.0, offsets.retention.minutes is now set to 10080 (7 days).

    You should change your broker configuration to allow retention of offset for longer periods or update to a more recent Kafka version.