Search code examples
javaapache-kafkakafka-producer-apijaassasl

Configure kafka producer SASL_JAAS_CONFIG error when using env variables


Hello i have the following config to produce to topic with spring batch app:

        Map<String,Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
    configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,10000);
    configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,6000);
    configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,8000);
    configProps.put(ProducerConfig.RETRIES_CONFIG,0);
     configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
     configProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
     configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
     "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "UYDUEWWU27LFC2CJ", "yqBcf5BM4rd3X273OsaB6/n7/kuxmRG39+Fr3nVuOdXI/RZ/sM9E6hpqmaAPxCbC"
     ));

This works fine, but when i try to get the env variables for username and password in SASL JAAS CONFIG like i do with bootstrap server:

    String username = env.getProperty("spring.bootstrap.kafka.username");
String password = env.getProperty("spring.bootstrap.kafka.password");
     configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
     "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), username, password
     ));

It give me the error for authentication failed when sending the object to the kafka topic even its says that the login was succesfull. I have verified that the content of each string format is the same with log. How is that possible?

Thanks


Solution

  • I solved it changing this line:

    configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), username, password ));

    for this one:

    configProps.put("sasl.jaas.config",String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",username, password));