I have Java 8 application working with Apache Kafka 2.11-0.10.1.0. I need to use the seek
feature to poll
old messages from partitions. However I faced an exception of No current assignment for partition
which is occurred every time I am trying to seekByOffset
. Here's my class which is responsible for seek
ing topics to the specified timestamp:
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* The main purpose of this class is to move fetching point for each partition of the {@link KafkaConsumer}
* to some offset which is determined either by timestamp or by offset number.
*/
public class KafkaSeeker {
public static final long APP_STARTUP_TIME = Instant.now().toEpochMilli();
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private final KafkaConsumer<String, String> kafkaConsumer;
private ConsumerRecords<String, String> polledRecords;
public KafkaSeeker(KafkaConsumer<String, String> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
this.polledRecords = new ConsumerRecords<>(Collections.emptyMap());
}
/**
* For each assigned or subscribed topic {@link org.apache.kafka.clients.consumer.KafkaConsumer#seek(TopicPartition, long)}
* fetching pointer to the specified {@code timestamp}.
* If no messages were found in each partition for a topic,
* then {@link org.apache.kafka.clients.consumer.KafkaConsumer#seekToEnd(Collection)} will be called.
*
* Due to {@link KafkaConsumer#subscribe(Pattern)} and {@link KafkaConsumer#assign(Collection)} laziness
* method needs to execute dummy {@link KafkaConsumer#poll(long)} method. All {@link ConsumerRecords} which were
* polled from buffer are swallowed and produce warning logs.
*
* @param timestamp is used to find proper offset to seek to
* @param topics are used to seek only specific topics. If not specified or empty, all subscribed topics are used.
*/
public Map<TopicPartition, OffsetAndTimestamp> seek(long timestamp, Collection<String> topics) {
this.polledRecords = kafkaConsumer.poll(0);
Collection<TopicPartition> topicPartitions;
if (CollectionUtils.isEmpty(topics)) {
topicPartitions = kafkaConsumer.assignment();
} else {
topicPartitions = topics.stream()
.map(it -> {
List<Integer> partitions = kafkaConsumer.partitionsFor(it).stream()
.map(PartitionInfo::partition).collect(Collectors.toList());
return partitions.stream().map(partition -> new TopicPartition(it, partition));
})
.flatMap(it -> it)
.collect(Collectors.toList());
}
if (topicPartitions.isEmpty()) {
throw new IllegalStateException("Kafka consumer doesn't have any subscribed topics.");
}
Map<TopicPartition, Long> timestampsByTopicPartitions = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition -> timestamp));
Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampsByTopicPartitions);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
TopicPartition topicPartition = entry.getKey();
if (entry.getValue() != null) {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to [{} offset].",
topicPartition.topic(),
topicPartition.partition(),
beginningOffsets.get(topicPartition),
entry.getValue());
kafkaConsumer.seek(topicPartition, entry.getValue().offset());
} else {
LOGGER.info("Kafka seek topic:partition [{}:{}] from [{} offset] to the end of partition.",
topicPartition.topic(),
topicPartition.partition());
kafkaConsumer.seekToEnd(Collections.singleton(topicPartition));
}
}
return offsets;
}
public ConsumerRecords<String, String> getPolledRecords() {
return polledRecords;
}
}
Before calling the method I have consumer subscribed to a single topic like this consumer.subscribe(singletonList(kafkaTopic));
. When I get kafkaConsumer.assignment()
it returns zero TopicPartition
s assigned. But if I specify the topic and get its partitions then I have valid TopicPartition
s, although they are failing on seek
call with the error in the title. What is something I forgot?
The correct way to reliably seek and check current assignment is to wait for the onPartitionsAssigned()
callback after subscribing. On a newly created (still not connected) consumer, calling poll()
once does not guarantees it will immedaitely be connected and assigned partitions.
As a basic example, see the code below that subscribes to a topic, and in the assigned callback, seeks to the desired position. Finally you'll notice that the poll loop correctly only sees records from the seek location and not from the previous committed or reset offset.
public static final Map<TopicPartition, Long> offsets = Map.of(new TopicPartition("testtopic", 0), 5L);
public static void main(String args[]) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("testtopic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Assigned " + partitions);
for (TopicPartition tp : partitions) {
OffsetAndMetadata oam = consumer.committed(tp);
if (oam != null) {
System.out.println("Current offset is " + oam.offset());
} else {
System.out.println("No committed offsets");
}
Long offset = offsets.get(tp);
if (offset != null) {
System.out.println("Seeking to " + offset);
consumer.seek(tp, offset);
}
}
}
});
for (int i = 0; i < 10; i++) {
System.out.println("Calling poll");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> r : records) {
System.out.println("record from " + r.topic() + "-" + r.partition() + " at offset " + r.offset());
}
}
}
}