We are facing an identical problem as described in this thread.
Here - Samza is requesting for an Kafka partition offset that is too old (i.e Kafka log has moved ahead). We are setting the property
consumer.auto.offset.reset
to smallest
and therefore expecting that Samza will reset its checkpoint to earliest available partition offset in such a scenario. But that is not happening we are getting exceptions of this form continually:
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying
Version Details
Browsing through the code, it appears that GetOffset::isValidOffset
should be able to catch the exception OffsetOutOfRangeException
and convert it to a false value. But it appears that this not happening. Could there be a mismatch in package
of the Exception
? GetOffSet class is catching the
exception import kafka.common.OffsetOutOfRangeException
, but from logs, it appears that the package of this class is different. Could this be the reason?
def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: TopicAndPartition, offset: String) = {
info("Validating offset %s for topic and partition %s" format (offset, topicAndPartition))
try {
val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
if (messages.hasError) {
KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, topicAndPartition.partition).exception())
}
info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition))
true
} catch {
case e: OffsetOutOfRangeException => false
}
}
Also, it Appears that BrokerProxy class - the caller of GetOffset
would print a log "It appears that..."
in case it gets a false value, but it is not logging this line (indicating that some Exception generated in GetOffset
method is going uncaught and being propagated up):
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
debug("Adding new topic and partition %s to queue for %s" format (tp, host))
if (nextOffsets.asJava.containsKey(tp)) {
toss("Already consuming TopicPartition %s" format tp)
}
val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
nextOffset
.get
.toLong
} else {
warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
offsetGetter.getResetOffset(simpleConsumer, tp)
}
debug("Got offset %s for new topic and partition %s." format (offset, tp))
nextOffsets += tp -> offset
metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
}
Could this be due to mismatch in Kafka client library version that we are using? Is there a recommended Kafka client version we should use with Samza 0.14.1 (assuming that Kafka server is 1.x)?
Any help regarding this will be greatly appreciated.
The above is a bug in samza 0.14.0 and 0.14.1. SAMZA-1822 is the bug id.
This is also discussed in the samza mailing list.