After raising an issue on spring-boot (https://github.com/spring-projects/spring-boot/issues/40398) and some discussion with the team, it seems like it should be possible to use SSL bundles on the KafkaTemplate the same as is documented for the RestTemplate.
I have defined an SSL Bundle for Kafka and am attempting to apply to the kafka config as follows:
@Bean
public Map<String,Object> buildConsumerProperties(SslBundles sslBundles) {
Map<String, Object> configProps = new HashMap<>();
configProps.put("spring.kafka.consumer.ssl.bundle", sslBundles.getBundle("kafka"));
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,kafkaConfiguration.getKafkaSecurityProtocol());
return configProps;
}
This connects but errors:
APPENDER=APP, DATE=2024-04-25 15:10:40.839, LEVEL=INFO , USER=, THREAD=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1], LOGGER=o.a.kafka.clients.NetworkClient, CORR=, INT_CORR=, X-VCAP-REQUEST-ID=, MESSAGE=Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 204ms, elapsed time since send: 204ms, request timeout: 30000ms)
APPENDER=APP, DATE=2024-04-25 15:10:40.840, LEVEL=WARN , USER=, THREAD=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1], LOGGER=o.a.kafka.clients.NetworkClient, CORR=, INT_CORR=, X-VCAP-REQUEST-ID=, MESSAGE=Bootstrap broker b0.dev-k4a.recp-da-kafka-dev.shared.banksvcs.net:9093 (id: -1 rack: null) disconnected
"Normal" configuration of the SSL Related properties works fine:
private void setSslProperties(Map<String, Object> configProps) throws URISyntaxException, IOException {
configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.getTruststorePath());
configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConfiguration.getKafkaSslTruststorePassword());
configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaConfiguration.getKafkaSslKeystoreType());
configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.getKeystorePath());
configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaConfiguration.getKafkaSslKeystorePassword());
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaConfiguration.getKafkaSecurityProtocol());
}
Versions:
Wondering if this has not yet made it into the kafka-client or if I have missed something here
As per above - I have tried to switch out from key/truststores to an SSL bundle, but this appears not to be working
That spring.kafka.consumer.ssl.bundle
is for this:
/**
* Name of the SSL bundle to use.
*/
private String bundle;
so, it is expected to be String
, while sslBundles.getBundle("kafka")
returns for us an SslBundle
instance. Which is apparently just plain PropertiesSslBundle
and then logic is like this:
if (getBundle() != null) {
properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG)
.accept(SslBundleSslEngineFactory.class.getName());
properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle()));
}
else {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
And probably that PropertiesSslBundle
is not mapped correctly to whatever Apache Kafka client would expect via its SslConfigs
.
I believe you still can work it out, but really manually: sslBundles.getBundle("kafka")
and then all those SslConfigs
properties for respective getters of the SslBundle
.