EDIT: The test now only passes when I use localhost:9092 for the bootstrap server, which is the default port I have my kafka/zookeeper docker image running on.
I need help with my integration test, which tests that the Kafka payload is consumed. I am using Test containers for the KafkaContainer and the MysqlContainer. The goal of the test is to ensure that the SUT can correctly consumer the payload from the Kafka produce, but the consumer is never being hit during the test. Despite not working correctly during the test, the code works as it should when I run it locally.
So far, I've tried using an EmbeddedKafkaBroker, which did not work, and I also tried changing the message topic. I know that the issue is somewhere in my configuration, but I can't exactly put my finger on it. Below, the application-test.properties, the SUT, the test class, and the logs will be attached in that specific order.
Also, the reason that I am using the userRepository
for my assertions, is that whenever the payload is consumed, it is saved to the repository and I want to check if the saved user exists.
# Kafka Properties
# Kafka Consumer Properties
SUT (RecipientPayloadConsumer)
public class RecipientPayloadConsumer implements KafkaConsumer<RecipientSavedEvent> {
private final Creator<User> userCreator;
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consumePayload(@Payload RecipientSavedEvent payload) {
log.info("Payload received from 'email-service': {}", payload);
User userToSave = this.convertPayloadToUser(payload);
try {
} catch (IllegalStateException exception) {
log.error("IllegalStateException caught attempting to save payload from email-service.");
log.error("Message: {}", exception.getMessage());
throw exception;
private User convertPayloadToUser(RecipientSavedEvent payload) {
return User.builder()
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {
private static KafkaTemplate<String, String> kafkaTemplate;
private UserRepository userRepository;
private RecipientPayloadConsumer sut;
public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
public static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", mySQLContainer::getUsername);
registry.add("spring.datasource.password", mySQLContainer::getPassword);
public static void setUp() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
public void afterEach() {
public static void tearDown() {
public void consumePayload_Should_SavePayload() throws InterruptedException {
RecipientSavedEvent expected = getPayload();
kafkaTemplate.sendDefault(expected.getPayloadKey(), expected.toString());
User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
private RecipientSavedEvent getPayload() {
return RecipientSavedEvent.builder()
2023-04-23T21:07:12.300-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.3.2
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: b66af662e61082cb
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1682309232324
2023-04-23T21:07:12.993-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:12.994-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: Im2Ds_vjQq289WTPzltJlA
2023-04-23T21:07:13.018-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-04-23T21:07:13.150-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.289-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.424-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 6 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.550-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.667-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.789-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.911-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.609-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.754-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.934-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition tappedtechnologies.emails.recipients-0 to 0 since the associated topicId changed from null to mmZzWI7DSkGiNcBk1J0Lmw
With the help of @M.Deinum, I solved the issue I was facing. It was quite simple; I used @Autowired
to inject the KafkaTemplate
from the context and removed the entire @BeforeAll
setup. By removing the @BeforeAll
setup, I had to use the DynamicPropertyRegistry
and the KafkaContainer
to add the spring.kafka.bootstrap-server
to the properties.
Below is the adjusted code.
# Kafka Properties
# Kafka Consumer Properties
# Kafka Producer Properties
# -- NEWLY ADDED -- #
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {
private static final String TOPIC = "tappedtechnologies.test.topics";
private UserRepository userRepository;
private KafkaTemplate<String, RecipientSavedEvent> kafkaTemplate;
public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
public static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", mySQLContainer::getUsername);
registry.add("spring.datasource.password", mySQLContainer::getPassword);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
public void afterEach() {
public void consumePayload_Should_SavePayload() throws InterruptedException {
RecipientSavedEvent expected = getPayload();
kafkaTemplate.send(TOPIC, 0, Instant.now().getEpochSecond(), expected.getPayloadKey(), expected);
User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
assert actual != null;
private RecipientSavedEvent getPayload() {
return RecipientSavedEvent.builder()