I am trying to setup SASL_PLAINTEXT configuration in kafka. I went through documentation and have have done the needful configurations as discussed below:-
Duplicated config/server.properties
as config/server_sasl_plain.properties:-
server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://localhost:9094
advertised.listeners=SASL_PLAINTEXT://localhost:9094
Added the below lines in zookeeper.properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
then duplicated consumer.properties
to consumer_sasl_plain.properties
and added below contents:-
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
Similarly, created producer_sasl_plain.properties
and added below content:-
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
created config/zookeeper_jaas.conf
with the below contents:-
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="zookeeper"
user_admin="admin-secret";
};
created kafka_server_jaas.conf
with below content:-
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
Created kafka_client_jaas.conf
with below contents:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
Started zookeeper
as below:-
export KAFKA_OPTS="Djava.security.auth.login.config=/KAFKA_HOME/config/zookeeper_jaas.conf"
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Started Broker as below:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"
$ bin/kafka-server-start.sh -daemon config/server_sasl_plain.properties
Started consumer as below:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --
topic test-topic --from-beginning --
consumer.config=config/consumer_sasl_plain.properties --bootstrap-server=localhost:9094
started produces as below:-
$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic test-topic
--producer.config=config/producer_sasl_plain.properties
I am able to produce messages from producer:- But do not see anything in the consumer
I am not sure, but it seems the messages are actually getting posted to kafka from the producer. To verify it, I checked the server logs directory for the topic and I could see messages coming there by tailing the entire logs directory :).
Also to further verify it, I created a small scala program to read kafka message from the topic ..sample below:-
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9094")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config", """
|org.apache.kafka.common.security.plain.PlainLoginModule required
|username="admin"
|password="admin-secret";
|""".stripMargin)
.option("subscribe", "test-topic") // Topic to subscribe to
.option("startingOffsets", "earliest") // Starting offset
.load()
df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING).show()