Search code examples
javaspringspring-bootapache-kafkakafka-consumer-api

Integration Test Issues for Kafka Consumer


EDIT: The test now only passes when I use localhost:9092 for the bootstrap server, which is the default port I have my kafka/zookeeper docker image running on.

I need help with my integration test, which tests that the Kafka payload is consumed. I am using Test containers for the KafkaContainer and the MysqlContainer. The goal of the test is to ensure that the SUT can correctly consumer the payload from the Kafka produce, but the consumer is never being hit during the test. Despite not working correctly during the test, the code works as it should when I run it locally.

So far, I've tried using an EmbeddedKafkaBroker, which did not work, and I also tried changing the message topic. I know that the issue is somewhere in my configuration, but I can't exactly put my finger on it. Below, the application-test.properties, the SUT, the test class, and the logs will be attached in that specific order.

Also, the reason that I am using the userRepository for my assertions, is that whenever the payload is consumed, it is saved to the repository and I want to check if the saved user exists.

application-test.properties

# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.emails.recipients

# Kafka Consumer Properties
spring.kafka.consumer.group-id=userId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

SUT (RecipientPayloadConsumer)

@Slf4j
@Service
@RequiredArgsConstructor
public class RecipientPayloadConsumer implements KafkaConsumer<RecipientSavedEvent> {

    private final Creator<User> userCreator;

    @Override
    @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumePayload(@Payload RecipientSavedEvent payload) {
        log.info("Payload received from 'email-service': {}", payload);
        User userToSave = this.convertPayloadToUser(payload);
        try {
            userCreator.create(userToSave);
        } catch (IllegalStateException exception) {
            log.error("IllegalStateException caught attempting to save payload from email-service.");
            log.error("Message: {}", exception.getMessage());
            throw exception;
        }
    }

    private User convertPayloadToUser(RecipientSavedEvent payload) {
        return User.builder()
                .firstName(payload.getFirstName())
                .lastName(payload.getLastName())
                .email(payload.getEmail())
                .build();
    }
}

RecipientPayloadConsumerTest

@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {

    private static KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private UserRepository userRepository;
    @Autowired
    private RecipientPayloadConsumer sut;

    @Container
    public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
    @Container
    public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @DynamicPropertySource
    public static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
        registry.add("spring.datasource.username", mySQLContainer::getUsername);
        registry.add("spring.datasource.password", mySQLContainer::getPassword);
    }

    @BeforeAll
    public static void setUp() {
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");

        kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
        kafkaTemplate.setDefaultTopic("tappedtechnologies.emails.recipients");

        kafkaContainer.start();
        mySQLContainer.start();
    }

    @AfterEach
    public void afterEach() {
        userRepository.deleteAll();
    }

    @AfterAll
    public static void tearDown() {
        kafkaContainer.stop();
        mySQLContainer.stop();
    }

    @Test
    public void consumePayload_Should_SavePayload() throws InterruptedException {
        RecipientSavedEvent expected = getPayload();
        kafkaTemplate.sendDefault(expected.getPayloadKey(), expected.toString());
        Thread.sleep(5000);

        User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);

        assertThat(actual).isNotNull();
        assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
        assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
        assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
    }

    private RecipientSavedEvent getPayload() {
        return RecipientSavedEvent.builder()
                .payloadKey(UUID.randomUUID().toString())
                .firstName("Randy")
                .lastName("Marsh")
                .email("randy@tegridyfarms.com")
                .build();
    }
}

logs

2023-04-23T21:07:12.300-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1682309232324
2023-04-23T21:07:12.993-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:12.994-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: Im2Ds_vjQq289WTPzltJlA
2023-04-23T21:07:13.018-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-04-23T21:07:13.150-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.289-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.424-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 6 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.550-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.667-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.789-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.911-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.609-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.754-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.934-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition tappedtechnologies.emails.recipients-0 to 0 since the associated topicId changed from null to mmZzWI7DSkGiNcBk1J0Lmw

Solution

  • With the help of @M.Deinum, I solved the issue I was facing. It was quite simple; I used @Autowired to inject the KafkaTemplate from the context and removed the entire @BeforeAll setup. By removing the @BeforeAll setup, I had to use the DynamicPropertyRegistry and the KafkaContainer to add the spring.kafka.bootstrap-server to the properties.

    Below is the adjusted code.

    application-test.properties

    # Kafka Properties
    spring.kafka.topic.name=tappedtechnologies.test.topics
    
    # Kafka Consumer Properties
    spring.kafka.consumer.group-id=testId
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.type.mapping=event:com.tappedtechnologies.userservice.events.RecipientSavedEvent
    spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
    spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
    
    # Kafka Producer Properties
    # -- NEWLY ADDED -- #
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    

    RecipientPayloadConsumerTests

    @Testcontainers
    @SpringBootTest
    @TestPropertySource(locations = "classpath:application-test.properties")
    public class RecipientPayloadConsumerTests {
    
        private static final String TOPIC = "tappedtechnologies.test.topics";
    
        @Autowired
        private UserRepository userRepository;
        @Autowired
        private KafkaTemplate<String, RecipientSavedEvent> kafkaTemplate;
    
        @Container
        public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
        @Container
        public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
    
        @DynamicPropertySource
        public static void setProperties(DynamicPropertyRegistry registry) {
            registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
            registry.add("spring.datasource.username", mySQLContainer::getUsername);
            registry.add("spring.datasource.password", mySQLContainer::getPassword);
    
            registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
        }
    
        @AfterEach
        public void afterEach() {
            userRepository.deleteAll();
        }
    
        @Test
        public void consumePayload_Should_SavePayload() throws InterruptedException {
            RecipientSavedEvent expected = getPayload();
            kafkaTemplate.send(TOPIC, 0, Instant.now().getEpochSecond(), expected.getPayloadKey(), expected);
            Thread.sleep(5000);
    
            User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
    
            assert actual != null;
            assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
            assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
            assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
        }
    
        private RecipientSavedEvent getPayload() {
            return RecipientSavedEvent.builder()
                    .payloadKey(UUID.randomUUID().toString())
                    .firstName("Randy")
                    .lastName("Marsh")
                    .email("randy@tegridyfarms.com")
                    .build();
        }
    }