I'm using the below command to send records to a secure Kafka
bin/kafka-console-producer.sh --topic <My Kafka topic name> --bootstrap-server <My custom bootstrap server> --producer.config /Users/DY/SSL/ssl.properties
As you can see I have added the ssl.properties
file's path to the --producer.config
switch.
The ssl.properties file contains details about how to connect to secure kafka, its contents are below:
security.protocol=SSL
ssl.truststore.location=<My custom value>
ssl.truststore.password=<My custom value>
ssl.key.password=<My custom value>
ssl.keystore.location=<My custom value>
ssl.keystore.password=<My custom value>
Now, I want to use replicate this command with java producer. The code that I've written is as:
public class MyProducer {
public static void main(String[] args) {
{
Properties properties = new Properties();
properties.put("bootstrap.servers", <My bootstrap server>);
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
properties.put("producer.config", "/Users/DY/SSL/ssl.properties");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
<My bootstrap server>, "Hello World from program");
Future<RecordMetadata> future = kafkaProducer.send(
producerRecord,
(metadata, exception) -> {
if(exception != null){
System.out.printf("some thing wrong");
exception.printStackTrace();
}
else{
System.out.println("Successfully transmitted");
}
});
future.get()
kafkaProducer.close();
}
}
}
This way of passing the properties.put("producer.config", "/Users/DY/SSL/ssl.properties");
however does not seem to work. Could anybody let me know what would be an appropriate way to do this
Rather than use any file to pass the properties individually, you can use static client configs as below;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// for SSL Encryption
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "<My custom value>");
// for SSL Authentication
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "<My custom value>");
Required classes are;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;