Search code examples
javaspring-bootspring-kafka

Spring Boot: Using SSL Bundles for spring-kaka MATLS


After raising an issue on spring-boot (https://github.com/spring-projects/spring-boot/issues/40398) and some discussion with the team, it seems like it should be possible to use SSL bundles on the KafkaTemplate the same as is documented for the RestTemplate.

I have defined an SSL Bundle for Kafka and am attempting to apply to the kafka config as follows:

    @Bean
    public Map<String,Object> buildConsumerProperties(SslBundles sslBundles) {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put("spring.kafka.consumer.ssl.bundle", sslBundles.getBundle("kafka"));
        configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,kafkaConfiguration.getKafkaSecurityProtocol());
        return configProps;
    }

This connects but errors:

APPENDER=APP, DATE=2024-04-25 15:10:40.839, LEVEL=INFO , USER=, THREAD=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1], LOGGER=o.a.kafka.clients.NetworkClient, CORR=, INT_CORR=, X-VCAP-REQUEST-ID=, MESSAGE=Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 204ms, elapsed time since send: 204ms, request timeout: 30000ms)
APPENDER=APP, DATE=2024-04-25 15:10:40.840, LEVEL=WARN , USER=, THREAD=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1], LOGGER=o.a.kafka.clients.NetworkClient, CORR=, INT_CORR=, X-VCAP-REQUEST-ID=, MESSAGE=Bootstrap broker b0.dev-k4a.recp-da-kafka-dev.shared.banksvcs.net:9093 (id: -1 rack: null) disconnected

"Normal" configuration of the SSL Related properties works fine:

private void setSslProperties(Map<String, Object> configProps) throws URISyntaxException, IOException {
        configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.getTruststorePath());
        configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConfiguration.getKafkaSslTruststorePassword());
        configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaConfiguration.getKafkaSslKeystoreType());
        configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.getKeystorePath());
        configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaConfiguration.getKafkaSslKeystorePassword());
        configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaConfiguration.getKafkaSecurityProtocol());
    }

Versions:

  • spring-kafka: 3.1.3
  • spring-boot: 3.2.2

Wondering if this has not yet made it into the kafka-client or if I have missed something here

As per above - I have tried to switch out from key/truststores to an SSL bundle, but this appears not to be working


Solution

  • That spring.kafka.consumer.ssl.bundle is for this:

        /**
         * Name of the SSL bundle to use.
         */
        private String bundle;
    

    so, it is expected to be String, while sslBundles.getBundle("kafka") returns for us an SslBundle instance. Which is apparently just plain PropertiesSslBundle and then logic is like this:

            if (getBundle() != null) {
                properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG)
                    .accept(SslBundleSslEngineFactory.class.getName());
                properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle()));
            }
            else {
                PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
                map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
    

    And probably that PropertiesSslBundle is not mapped correctly to whatever Apache Kafka client would expect via its SslConfigs.

    I believe you still can work it out, but really manually: sslBundles.getBundle("kafka") and then all those SslConfigs properties for respective getters of the SslBundle.