Search code examples
javaspring-bootapache-kafkaspring-kafkaspring-kafka-test

spring Kafka integration testing with embedded Kafka


I have spring boot application that had a consumer consumes from topic in one cluster and produces to another topic in different cluster.

Now I'm trying to write integration test case using spring embedded Kafka but having an issue KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

Consumer Class

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);
    
   }

}

Producer Class

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }

In my Test cases I want to override KafkaTemplate so that when i call kafkaConsumerService.professor method in Test it should produce the data into embedded Kafka and i should validate it.

Test config

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

 }

Test class

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
     kafkaConsumerService.professor(Arrays.asList(new Professor()));
     
     //How to check messages is sent to kafka?
}

 }

Error

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

And also can some one help me how to validate messages sent into embedded Kafka server?

Note I'm having some deprecated warnings

The type KafkaEmbedded is deprecated

The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated

The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated


Solution

  • Boot 2.1 disables bean overriding by default.

    Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set spring.main.allow-bean-definition-overriding to true.

    Regarding the deprecations; see the javadocs for @EmbeddedKafka. It is replaced by EmbeddedKafkaBroker.