I am trying to understand kafka in some details with respect to kafka streams (kafka stream client to kafka).
I understand that KafkConsumer (java client) would get data from kafka, however I am not able to understand at which frequency does client poll kakfa topic to fetch the data?
The frequency of the poll is defined by your code because you're responsible to call poll. A very naive example of user code using KafkaConsumer is like the following
public class KafkaConsumerExample {
...
static void runConsumer() throws InterruptedException {
final Consumer<Long, String> consumer = createConsumer();
final int giveUp = 100; int noRecordsCount = 0;
while (true) {
final ConsumerRecords<Long, String> consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
consumer.close();
System.out.println("DONE");
}
}
In this case the frequency is defined by the duration of processing the messages in consumerRecords.forEach
.
However, keep in mind that if you don't call poll "fast enough" your consumer will be considered dead by the broker coordinator and a rebalance will be triggered.
This "fast enough" is determined by the property max.poll.interval.ms
in kafka >= 0.10.1.0. See this answer for more details.
max.poll.interval.ms
default value is five minutes, so if your consumerRecords.forEach
takes longer than that your consumer will be considered dead.
If you don't want to use the raw KafkaConsumer
directly you could use alpakka kafka, a library for consume from and produce to kafka topics in a safe and backpressured way (is based on akka streams).
With this library, the frequency of poll is determined by configuration akka.kafka.consumer.poll-interval
.
We say is safe because it will continue polling to avoid the consumer is considered dead even when your processing can't keep up the rate. It's able to do this because KafkaConsumer
allows pausing the consumer
/**
* Suspend fetching from the requested partitions. Future calls to {@link #poll(Duration)} will not return
* any records from these partitions until they have been resumed using {@link #resume(Collection)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
@Override
public void pause(Collection<TopicPartition> partitions) { ... }
To fully understand this you should read about akka-streams and backpressure.