Search code examples
spring-bootspring-kafkaspring-kafka-test

EmbeddedKafka not working cause an Scala error


I have a working spring-boot java-gradle-based service that can produce and consume Kafka messages. But I can't create an integration test using spring-kafka-test library with @EmbeddedKafka annotation or using @ClassRule way. In both ways, I end up with the same error (pointed below regarding Scala). If anybody has any clue about what could be happening behind the scenes it would be very helpful.

Spring boot version: 2.1.6.RELEASE Spring Kafka version: 2.2.7.RELEASE

Producer Config:

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    private String bootstrapAddress;

    public KafkaProducerConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapAddress) {
        this.bootstrapAddress = bootstrapAddress;
    }

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
}

Producer Code:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Greeting> kafkaTemplate;

    @Value(value = "${kiosk.kafka.topic.greeting}")
    private String greetingTopicName;

    public void sendMessage(Greeting greeting) {

        ListenableFuture<SendResult<String, Greeting>> future = kafkaTemplate.send(greetingTopicName, greeting);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Greeting>>() {

            @Override
            public void onSuccess(SendResult<String, Greeting> result) {
                System.out.println("Sent greeting=[" + greeting + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send greeting=[" + greeting + "] due to : " + ex.getMessage());
            }
        });
    }
}

properties file:

spring.kafka.consumer.auto-offset-reset=earliest
spring.embedded.kafka.brokers=localhost:9092
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=1

And I tried the test with both ways and the same error while starting the test:

Test class rule:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class PKafkaProducerClassRuleTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerClassRuleTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private PinPadKafkaProducer sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka =
        new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

Test with EmbeddedKafka:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1,
    topics = {
        "sender.t" })
public class PKafkaProducerEmbeddedKafkaTest {

    private static final Logger LOGGER =
        LoggerFactory.getLogger(PKafkaProducerEmbeddedKafkaTest.class);

    private static String SENDER_TOPIC = "sender.t";

    static {
        System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
            "spring.kafka.bootstrap-servers");
    }

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private PinPadKafkaProducer sender;

    @Test
    public void someTest() {
        sender.sendMessage(new Greeting("msgtest", "nametest"));
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, SENDER_TOPIC);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    }

}

Error in both scenarios:

19/11/2019 13:51:56.774+0100 ERROR o.s.t.c.TestContextManager [] - Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@e9cee0d] to prepare test instance [com.goldcar.kiosk.PKafkaProducerEmbeddedKafkaTest@39271181]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'embeddedKafka': Invocation of init method failed; nested exception is java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:405)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:109)
    at org.springframework.boot.test.context.SpringBootContextLoader$ContextCustomizerAdapter.initialize(SpringBootContextLoader.java:300)
    at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:621)
    at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:365)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:310)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:119)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 49 common frames omitted
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at kafka.cluster.EndPoint$.<init>(EndPoint.scala:32)
    at kafka.cluster.EndPoint$.<clinit>(EndPoint.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:68)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:781)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.utils.TestUtils$.createBrokerConfig(TestUtils.scala:234)
    at kafka.utils.TestUtils.createBrokerConfig(TestUtils.scala)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.createBrokerProperties(EmbeddedKafkaBroker.java:239)
    at org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774)
    ... 58 common frames omitted

Solution

  • It appears you have mis-matched versions of kafka jars on the classpath.

    This will happen if you use a different kafka-clients version than the default set by Boot.

    See this appendix for an example for how to override all kafka jar versions.

    When you use spring-kafka-test (version 2.2.x) with the 2.1.x kafka-clients jar, you need to override certain transitive dependencies, as follows:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>${spring.kafka.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <version>${spring.kafka.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
            </exclusion>
        </exclusions>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.1</version>
        <classifier>test</classifier>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.1.1</version>
        <scope>test</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.1.1</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>
    

    Note that when switching to scala 2.12 (recommended for 2.1.x and higher), the 2.11 version must be excluded from spring-kafka-test.