I am trying to implement non-blocking retries for my project using spring-kafka 2.8.0 with non-blocking retry feature.
I have done the following. autoCreateTopics property is false!
@KafkaListener(
id = "customListenerId",
idIsGroup = false,
topics = "main-topic-1, main-topic-2")
@RetryableTopic(
autoCreateTopics = "false",
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
backoff = @Backoff(delay = 10000),
attempts = "6",
kafkaTemplate = "kafkaTemplate",
include = {
AerospikeException.Timeout.class,
QueryTimeoutException.class,
HystrixRuntimeException.class})
public void consumePerTopic(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Payload byte[] payload,
Acknowledgment acknowledgment) {
processor.process(objectMapper.readValue(payload, CreateRequest.class));
acknowledgment.acknowledge();
}
I got the following integration test for it which uses kafka as embedded container. Test simulates db connection issues - code under test must throw AerospikeException.Timeout which is declared as retryable in consumer.
@Test
void shouldNotCreateEntityForTopicsWhenDbTimedOutAndProduceRetryMessages() {
performWithDbLatency(Duration.ofMillis(1000), () -> {
var createEntityIdempotencyKeys = entityKafkaSteps.sendCreateRequestToAllTopicsWithIdempotency(
getCreateEntityRequest("entity-owner-a"));
retryEntityKafkaSteps.consumeRetryCreateRequestsByIdempotencyKeys(createEntityIdempotencyKeys);
});
}
My configuration for embedded kafka container looks like
embedded:
kafka:
topicsToCreate: main-topic-1, main-topic-2
I understand that the default behaviour of kafka is to allow auto topics creation. This test is passing successfully, but as I understand autoCreateTopics property should block retry and dlt topics creation even if it is allowed in kafka broker -> and test must fail because there is no retry and dlt topics in broker.
Please clarify is it an expected behaviour or not, or whether I am missing something? The same behaviour is present if to declare retry functionality via RetryTopicConfiguration bean using doNotAutoCreateRetryTopics() method.
Thanks a lot.
Edited : My main concern is:
autoCreateTopics = "false"
.embedded.kafka.topicsToCreate
property. (Kafka broker default property is allow.auto.create.topics = true
)I am using playtika test containers library.
No autoCreateTopics = "false"
simply means that the framework will not create the topics; it has no bearing on whether the broker is configured to create them.
@EmbeddedKafka(topics = ...)
will always create the topics.
To disable topic creation in the embedded kafka broker, use
@EmbeddedKafka(brokerProperties = "auto.create.topics.enable:false")
and don't specify any topics
there.
Or you can set the consumer property allow.auto.create.topics
to false
; it overrides the broker property (if it is true).
https://kafka.apache.org/documentation/#consumerconfigs_allow.auto.create.topics