I want to test my kafka consumer, but there is in issue with @EmbddedKafka.
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventConsumer {
private final CustomInterface customInterface;
@KafkaListener(topics = "test-topic")
public void consumeEvents(Event event) {
customInterface.apply(event);
}
}
}
My test class is as the following
@EmbeddedKafka
@ExtendWith(MockitoExtension.class)
@SpringBootTest(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaConsumerTest {
private Producer<String, String> producer;
private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
"event-file.json").toFile();
private KafkaEventConsumer kafkaEventConsumer;
@Mock
private CustomInterface CustomInterface;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ObjectMapper objectMapper;
@BeforeAll
void setUp() {
kafkaEventConsumer = new KafkaEventConsumer(customInterface);
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(),
new StringSerializer()).createProducer();
}
@Test
void consumeEvents() throws IOException, BadCurrencyException {
var event = objectMapper.readValue(EVENT_JSON,Event.class);
String message = objectMapper.writeValueAsString(event);
producer.send(new ProducerRecord<>("test-topic", 0, "1", message));
producer.flush();
// Read the message and assert its properties
verify(customeInterface, timeout(10000).times(1)).apply(any());
}
@AfterAll
void shutdown() {
producer.close();
}
}
The test doesn't pass, the consumer didn't intercept the message
Wanted but not invoked: customInterface.apply( <any> ); Actually, there were zero interactions with this mock.
PS: I followed this interesting article
I used KafkaTemplate
@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
partitions = 1,
controlledShutdown = true)
class KafkaConsumerTest {
private static File EVENT_JSON = Paths.get("src", "test", "resources", "files",
"event-file.json").toFile();
@Autowired
KafkaTemplate<String, Event> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@SpyBean
private KafkaEvenConsumer kafkaEvenConsumer;
@SpyBean
private MyInterface myInterface;
@Captor
ArgumentCaptor<Event> eventCaptor;
@Test
@SneakyThrows
@DirtiesContext
void consumeEvents() {
Event event = objectMapper.readValue(EVENT_JSON, Event.class);
kafkaTemplate.send("test-topic, "1", event);
verify(kafkaEventConsumer,timeout(10000).times(1)).consumeEvents(eventCaptor.capture());
Event argument = eventCaptor.getValue();
// .. assert the message properties
verify(myInterface, timeout(10000).times(1)).apply(any());
}
}