I'm using embedded kafka, spring and junit to run the integration with my listener, today if I run just this class the tests pass, but if I run all the application tests with or without jacoco coverage it falls into timeout, follow the current listener code, embedded kafka config and test.
@Component
public class CategoryEventListener {
public static final TypeReference<MessageValue<OutboxEventEntity>> CATEGORY_MESSAGE = new TypeReference<>() {
};
private static final Logger LOG = LoggerFactory.getLogger(CategoryEventListener.class);
private final DefaultSaveCategoryUseCase saveCategoryUseCase;
private final DefaultRemoveCategoryUseCase removeCategoryUseCase;
public CategoryEventListener(
final DefaultSaveCategoryUseCase saveCategoryUseCase,
final DefaultRemoveCategoryUseCase removeCategoryUseCase
) {
this.saveCategoryUseCase = Objects.requireNonNull(saveCategoryUseCase);
this.removeCategoryUseCase = Objects.requireNonNull(removeCategoryUseCase);
}
@KafkaListener(
concurrency = "${kafka.consumers.categories.concurrency}",
containerFactory = "kafkaListenerFactory",
topics = "${kafka.consumers.categories.topics}",
groupId = "${kafka.consumers.categories.group-id}",
id = "${kafka.consumers.categories.id}",
properties = {
"auto.offset.reset=${kafka.consumers.categories.auto-offset-reset}"
}
)
public void onMessage(@Payload final String payload, final Acknowledgment ack) {
LOG.debug("Message received from Kafka: {}", payload);
final var aOutBoxEvent = Json.readValue(payload, CATEGORY_MESSAGE).payload().after();
switch (aOutBoxEvent.getEventType()) {
case EventsTypes.CATEGORY_CREATED -> {
final var aCategoryCreated = Json.readValue(aOutBoxEvent.getData(), CategoryCreatedEvent.class);
final var aCategory = aCategoryCreated.toDomain();
this.saveCategoryUseCase.execute(aCategory);
ack.acknowledge();
LOG.debug("Category created received from Kafka: {}", aCategory.getName());
}
case EventsTypes.CATEGORY_UPDATED -> {
final var aCategoryUpdated = Json.readValue(aOutBoxEvent.getData(), CategoryUpdatedEvent.class);
final var aCategory = aCategoryUpdated.toDomain();
this.saveCategoryUseCase.execute(aCategory);
ack.acknowledge();
LOG.debug("Category updated received from Kafka: {}", aCategory.getName());
}
case EventsTypes.CATEGORY_DELETED -> {
final var aCategoryDeleted = Json.readValue(aOutBoxEvent.getData(), CategoryDeletedEvent.class);
this.removeCategoryUseCase.execute(RemoveCategoryCommand
.with(aCategoryDeleted.rootCategoryId(), aCategoryDeleted.subCategoryId()
.orElse(null)));
ack.acknowledge();
LOG.debug("Category deleted received from Kafka: {}", aOutBoxEvent.getData());
}
default -> LOG.warn("Event type not supported: {}", aOutBoxEvent.getEventType());
}
}
}
embedded kafka config:
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@ActiveProfiles("test-integration-kafka")
@EnableAutoConfiguration(exclude = {ElasticsearchRepositoriesAutoConfiguration.class})
@SpringBootTest(
classes = {Main.class, AmqpTestConfiguration.class, IntegrationTestConfiguration.class},
properties = {"kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@ExtendWith(JpaCleanUpExtension.class)
@Tag("heavyIntegrationTest")
public abstract class AbstractEmbeddedKafkaTest {
@Autowired
protected EmbeddedKafkaBroker kafkaBroker;
private Producer<String, String> producer;
@BeforeAll
void init() {
producer =
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaBroker), new StringSerializer(), new StringSerializer())
.createProducer();
}
@AfterAll
void shutdown() {
producer.close();
}
protected Producer<String, String> producer() {
return producer;
}
protected Source aSource() {
return new Source("ecommerce-mysql", "ecommerce", "outbox");
}
}
application.yml
kafka:
auto-create-topics: true
bootstrap-servers: localhost:9092
pool-timeout: 1_000
auto-commit: false
consumers:
categories:
auto-offset-reset: earliest
concurrency: 1
id: kafka-listener-categories
topics: category-topic
group-id: categories-group
consumer config:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerFactory() {
final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(kafkaProperties.getPoolTimeout());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> consumerConfigs() {
final var props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, kafkaProperties.isAutoCreateTopics());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isAutoCommit());
return props;
}
tests:
@Execution(ExecutionMode.CONCURRENT)
@Order(1)
public class CategoryEventListenerTest extends AbstractEmbeddedKafkaTest {
@MockBean
private DefaultSaveCategoryUseCase saveCategoryUseCase;
@MockBean
private DefaultRemoveCategoryUseCase removeCategoryUseCase;
@Value("${kafka.consumers.categories.topics}")
private String categoryTopic;
@Test
void givenAValidCategoryCreatedEvent_whenReceive_shouldPersistCategory() throws Exception {
// given
final var aCategory = Fixture.Categories.tech();
final var aCategoryEvent = CategoryCreatedEvent.from(aCategory);
final var aOutboxEvent = OutboxEventEntity.from(aCategoryEvent);
final var aMessage = Json.writeValueAsString(new MessageValue<>(new ValuePayload<>(aOutboxEvent, aOutboxEvent, aSource(), Operation.CREATE)));
final var latch = new CountDownLatch(1);
Mockito.doAnswer(t -> {
latch.countDown();
return aCategory;
}).when(saveCategoryUseCase).execute(Mockito.any());
// when
producer().send(new ProducerRecord<>(categoryTopic, aMessage));
producer().flush();
Assertions.assertTrue(latch.await(3, TimeUnit.MINUTES));
// then
Mockito.verify(saveCategoryUseCase, Mockito.times(1)).execute(eq(aCategory));
}
@Test
void givenAValidCategoryUpdatedEvent_whenReceive_shouldPersistCategory() throws Exception {
// given
final var aCategory = Fixture.Categories.home();
final var aCategoryEvent = CategoryUpdatedEvent.from(aCategory);
final var aOutboxEvent = OutboxEventEntity.from(aCategoryEvent);
final var aMessage = Json.writeValueAsString(new MessageValue<>(new ValuePayload<>(aOutboxEvent, aOutboxEvent, aSource(), Operation.CREATE)));
final var latch = new CountDownLatch(1);
Mockito.doAnswer(t -> {
latch.countDown();
return aCategory;
}).when(saveCategoryUseCase).execute(Mockito.any());
// when
producer().send(new ProducerRecord<>(categoryTopic, aMessage));
producer().flush();
Assertions.assertTrue(latch.await(3, TimeUnit.MINUTES));
// then
Mockito.verify(saveCategoryUseCase, Mockito.times(1)).execute(eq(aCategory));
}
}
error on test fail:
2023-11-30T13:48:32.426-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761', protocol='range'}
2023-11-30T13:48:32.427-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e', protocol='range'}
2023-11-30T13:48:32.427-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860', protocol='range'}
2023-11-30T13:48:32.427-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3', protocol='range'}
2023-11-30T13:48:32.427-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Finished assignment for group at generation 2: {consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761=Assignment(partitions=[]), consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e=Assignment(partitions=[]), consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3=Assignment(partitions=[category-topic-0]), consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860=Assignment(partitions=[])}
2023-11-30T13:48:32.428-03:00 INFO 287799 --- [quest-handler-3] k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: Assignment received from leader consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3 for group categories-group for generation 2. The group has 4 members, 0 of which are static.
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761', protocol='range'}
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e', protocol='range'}
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860', protocol='range'}
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3', protocol='range'}
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[category-topic-0])
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Adding newly assigned partitions:
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Adding newly assigned partitions:
2023-11-30T13:48:32.440-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Adding newly assigned partitions:
2023-11-30T13:48:32.441-03:00 INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer : categories-group: partitions assigned: []
2023-11-30T13:48:32.441-03:00 INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer : categories-group: partitions assigned: []
2023-11-30T13:48:32.441-03:00 INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer : categories-group: partitions assigned: []
2023-11-30T13:48:32.442-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Adding newly assigned partitions: category-topic-0
2023-11-30T13:48:32.451-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Found no committed offset for partition category-topic-0
2023-11-30T13:48:32.459-03:00 INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Resetting offset for partition category-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
2023-11-30T13:48:32.459-03:00 INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer : categories-group: partitions assigned: [category-topic-0]
2023-11-30T13:48:36.241-03:00 INFO 287799 --- [er-event-thread] kafka.controller.KafkaController : [Controller id=0] Processing automatic preferred replica leader election
Expected :true
Actual :false
<Click to see difference>
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
at com.kaua.ecommerce.infrastructure.listeners.CategoryEventListenerTest.givenAValidCategoryCreatedEvent_whenReceive_shouldPersistCategory(CategoryEventListenerTest.java:65)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:118)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:93)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:88)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at jdk.proxy2/jdk.proxy2.$Proxy5.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
I expect the tests to pass both when I run all the application tests and using jacoco coverage. I continued testing here, replacing latch with completablefuture and in both cases, if the test that uses Kafka runs before the other tests in the application, they pass.
I solved this by adding @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
. I added it to all my tests, I have annotation for each type and I added the DirtiesContext to them, it solved it, apparently some mock or other test was interfering