Search code examples
apache-kafkakafka-consumer-api

Messages not reaching kafka consumer with sasl_plaintext configuration


I am trying to setup SASL_PLAINTEXT configuration in kafka. I went through documentation and have have done the needful configurations as discussed below:-

Duplicated config/server.properties as config/server_sasl_plain.properties:-

server.properties
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=true
listeners=SASL_PLAINTEXT://localhost:9094
advertised.listeners=SASL_PLAINTEXT://localhost:9094

Added the below lines in zookeeper.properties

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000 

then duplicated consumer.properties to consumer_sasl_plain.properties and added below contents:-

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

Similarly, created producer_sasl_plain.properties and added below content:-

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

created config/zookeeper_jaas.conf with the below contents:-

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
   user_super="zookeeper"
   user_admin="admin-secret";
};

created kafka_server_jaas.conf with below content:-

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

Created kafka_client_jaas.conf with below contents:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="admin"
  password="admin-secret";
};
 

Started zookeeperas below:-

export KAFKA_OPTS="Djava.security.auth.login.config=/KAFKA_HOME/config/zookeeper_jaas.conf"
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Started Broker as below:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_server_jaas.conf"
$ bin/kafka-server-start.sh -daemon config/server_sasl_plain.properties

Started consumer as below:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-consumer.sh --
topic test-topic --from-beginning --
consumer.config=config/consumer_sasl_plain.properties --bootstrap-server=localhost:9094

started produces as below:-

$ export KAFKA_OPTS="-
Djava.security.auth.login.config=/KAFKA_HOME/config/kafka_client_jaas.conf"
$ ./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic test-topic
--producer.config=config/producer_sasl_plain.properties

I am able to produce messages from producer:- enter image description here But do not see anything in the consumer

enter image description here


Solution

  • I am not sure, but it seems the messages are actually getting posted to kafka from the producer. To verify it, I checked the server logs directory for the topic and I could see messages coming there by tailing the entire logs directory :).

    Also to further verify it, I created a small scala program to read kafka message from the topic ..sample below:-

      val df = spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9094")  
        .option("kafka.security.protocol", "SASL_PLAINTEXT") 
        .option("kafka.sasl.mechanism", "PLAIN")              
        .option("kafka.sasl.jaas.config", """
                                            |org.apache.kafka.common.security.plain.PlainLoginModule required
                                            |username="admin"
                                            |password="admin-secret";
                                            |""".stripMargin)
        .option("subscribe", "test-topic")                     // Topic to subscribe to
        .option("startingOffsets", "earliest")                 // Starting offset
        .load()
        df.selectExpr("CAST(value AS STRING)").as(Encoders.STRING).show()