Search code examples
javaspring-bootapache-kafkagraalvm-native-imagereactor-kafka

spring native kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.KafkaJsonSerializer


  • Include details about your goal

Run a SpringBoot 3.2.4 reactor kafka producer with graalVM 21 native image.

  • Describe expected results

A simple SpringBoot reactor kafka producer project is working fine as non native image. The image has been built many times, running with low, high load without any issue.

Now, I am trying to convert the job into a native image.

I expect the same process to also build, run fine.

  • Describe actual results:

While the build of the native image is happening without issue, at runtime, it seems there is an issue specific to graalVM native image.

The error stack trace is:

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:459)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
        at reactor.kafka.sender.internals.ProducerFactory.createProducer(ProducerFactory.java:34)
        at reactor.kafka.sender.internals.DefaultKafkaSender.lambda$new$2(DefaultKafkaSender.java:102)
        at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
        at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
        at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
        at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
        at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8825)
        at reactor.core.publisher.Flux.subscribeWith(Flux.java:8961)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8805)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8729)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8647)
        at com.Service.run(Service.java:51)
        at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:790)
        at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83)
        at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60)
        at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
        at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774)
        at [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at [email protected]/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
        at [email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:510)
        at [email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at [email protected]/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at [email protected]/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at [email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at [email protected]/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:341)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
        at com.Application.main(Application.java:18)
        at [email protected]/java.lang.invoke.LambdaForm$DMH/sa346b79c.invokeStaticInit(LambdaForm$DMH)
Caused by: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for io.confluent.kafka.serializers.KafkaJsonSerializer
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:399)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:395)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:385)
        ... 35 common frames omitted
Caused by: java.lang.NoSuchMethodException: io.confluent.kafka.serializers.KafkaJsonSerializer.<init>()
        at [email protected]/java.lang.Class.checkMethod(DynamicHub.java:1075)
        at [email protected]/java.lang.Class.getConstructor0(DynamicHub.java:1238)
        at [email protected]/java.lang.Class.getDeclaredConstructor(DynamicHub.java:2930)
        at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:397)
        ... 39 common frames omitted
  • Show some code:

Here is the snippet for the configuration:

    @Bean
    public KafkaSender<String, Log> kafkaSender(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(SSL_PROTOCOL, SSL_VALUE);
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassphrase);
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassphrase);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaJsonSerializer.class);
        final SenderOptions<String, Log> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions
                .producerListener(new MicrometerProducerListener(meterRegistry))
                .withObservation(observationRegistry));
    }
  • Question:

It seems one needs to manipulate registrars, some config.json to fix graalVM related issue.

What is needed in order to fix this issue?


Solution

  • Kafka uses reflection to lookup various classes from the properties, such as serializers.

    You'll need to define a config file to enable certain reflection calls such as to get constructors

    [ 
      { 
        "name": "io.confluent.kafka.serializers.KafkaJsonSerializer", 
        "allDeclaredConstructors": true
      } 
    ]