EDIT: The test now only passes when I use localhost:9092 for the bootstrap server, which is the default port I have my kafka/zookeeper docker image running on.
I need help with my integration test, which tests that the Kafka payload is consumed. I am using Test containers for the KafkaContainer and the MysqlContainer. The goal of the test is to ensure that the SUT can correctly consumer the payload from the Kafka produce, but the consumer is never being hit during the test. Despite not working correctly during the test, the code works as it should when I run it locally.
So far, I've tried using an EmbeddedKafkaBroker, which did not work, and I also tried changing the message topic. I know that the issue is somewhere in my configuration, but I can't exactly put my finger on it. Below, the application-test.properties, the SUT, the test class, and the logs will be attached in that specific order.
Also, the reason that I am using the userRepository
for my assertions, is that whenever the payload is consumed, it is saved to the repository and I want to check if the saved user exists.
application-test.properties
# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.emails.recipients
# Kafka Consumer Properties
spring.kafka.consumer.group-id=userId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
SUT (RecipientPayloadConsumer)
@Slf4j
@Service
@RequiredArgsConstructor
public class RecipientPayloadConsumer implements KafkaConsumer<RecipientSavedEvent> {
private final Creator<User> userCreator;
@Override
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consumePayload(@Payload RecipientSavedEvent payload) {
log.info("Payload received from 'email-service': {}", payload);
User userToSave = this.convertPayloadToUser(payload);
try {
userCreator.create(userToSave);
} catch (IllegalStateException exception) {
log.error("IllegalStateException caught attempting to save payload from email-service.");
log.error("Message: {}", exception.getMessage());
throw exception;
}
}
private User convertPayloadToUser(RecipientSavedEvent payload) {
return User.builder()
.firstName(payload.getFirstName())
.lastName(payload.getLastName())
.email(payload.getEmail())
.build();
}
}
RecipientPayloadConsumerTest
@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {
private static KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private RecipientPayloadConsumer sut;
@Container
public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@DynamicPropertySource
public static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", mySQLContainer::getUsername);
registry.add("spring.datasource.password", mySQLContainer::getPassword);
}
@BeforeAll
public static void setUp() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
kafkaTemplate.setDefaultTopic("tappedtechnologies.emails.recipients");
kafkaContainer.start();
mySQLContainer.start();
}
@AfterEach
public void afterEach() {
userRepository.deleteAll();
}
@AfterAll
public static void tearDown() {
kafkaContainer.stop();
mySQLContainer.stop();
}
@Test
public void consumePayload_Should_SavePayload() throws InterruptedException {
RecipientSavedEvent expected = getPayload();
kafkaTemplate.sendDefault(expected.getPayloadKey(), expected.toString());
Thread.sleep(5000);
User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
assertThat(actual).isNotNull();
assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
}
private RecipientSavedEvent getPayload() {
return RecipientSavedEvent.builder()
.payloadKey(UUID.randomUUID().toString())
.firstName("Randy")
.lastName("Marsh")
.email("randy@tegridyfarms.com")
.build();
}
}
logs
2023-04-23T21:07:12.300-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.3.2
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: b66af662e61082cb
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1682309232324
2023-04-23T21:07:12.993-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:12.994-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: Im2Ds_vjQq289WTPzltJlA
2023-04-23T21:07:13.018-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-04-23T21:07:13.150-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.289-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.424-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 6 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.550-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.667-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.789-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.911-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.609-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.754-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.934-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition tappedtechnologies.emails.recipients-0 to 0 since the associated topicId changed from null to mmZzWI7DSkGiNcBk1J0Lmw
With the help of @M.Deinum, I solved the issue I was facing. It was quite simple; I used @Autowired
to inject the KafkaTemplate
from the context and removed the entire @BeforeAll
setup. By removing the @BeforeAll
setup, I had to use the DynamicPropertyRegistry
and the KafkaContainer
to add the spring.kafka.bootstrap-server
to the properties.
Below is the adjusted code.
application-test.properties
# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.test.topics
# Kafka Consumer Properties
spring.kafka.consumer.group-id=testId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.type.mapping=event:com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
# Kafka Producer Properties
# -- NEWLY ADDED -- #
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
RecipientPayloadConsumerTests
@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {
private static final String TOPIC = "tappedtechnologies.test.topics";
@Autowired
private UserRepository userRepository;
@Autowired
private KafkaTemplate<String, RecipientSavedEvent> kafkaTemplate;
@Container
public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@DynamicPropertySource
public static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", mySQLContainer::getUsername);
registry.add("spring.datasource.password", mySQLContainer::getPassword);
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
}
@AfterEach
public void afterEach() {
userRepository.deleteAll();
}
@Test
public void consumePayload_Should_SavePayload() throws InterruptedException {
RecipientSavedEvent expected = getPayload();
kafkaTemplate.send(TOPIC, 0, Instant.now().getEpochSecond(), expected.getPayloadKey(), expected);
Thread.sleep(5000);
User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
assert actual != null;
assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
}
private RecipientSavedEvent getPayload() {
return RecipientSavedEvent.builder()
.payloadKey(UUID.randomUUID().toString())
.firstName("Randy")
.lastName("Marsh")
.email("randy@tegridyfarms.com")
.build();
}
}