Search code examples
apache-kafkaacljaas

Kafka TOPIC_AUTHORIZATION_FAILED


I'm actually working on setting up simple Kafka authentication using SASL Plain Text and add ACL authorization. But I have an issue when I try to consume data.

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : b8642491e78c5a13
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 1 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 2 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 3 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 4 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 5 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 6 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 7 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 8 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 9 : {test-topic=TOPIC_AUTHORIZATION_FAILED}
[main] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 10 : {test-topic=TOPIC_AUTHORIZATION_FAILED}

Next, you can see my configuration files.

server.properties

listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

producer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
bootstrap.servers=localhost:9092
compression.type=none

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group

kafka_server_jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret";
};

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice-secret";
};

Environment variable:

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"

Commands

Set ACL:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation All --group test-consumer-group --topic test-topic

start Kafka Server :

./bin/kafka-server-start.sh config/server.properties

Start Producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties

Start Consumer:

bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092

When I try to start the consumer, I have the issue described above. Also, in the kafka logs, I have this:

[2016-10-22 20:17:14,091] ERROR [KafkaApi-0] Error when handling request {group_id=test-consumer-group} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629)
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657)
    at scala.Option.getOrElse(Option.scala:121)
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657)
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
    at java.lang.Thread.run(Thread.java:745)

How can I fix this?


Solution

  • Issue fixed by separating jaas client and jaas server.

    kafka_server_jaas.conf

    KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin-secret"
      user_admin="admin-secret"
      user_alice="alice-secret";
    };
    

    kafka_client_jaas.conf

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="alice"
      password="alice-secret";
    };
    

    On the same terminal, export jaas server conf file and start kafka broker:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_server_jaas.conf"
    $ ./bin/kafka-server-start.sh config/server.properties
    

    On a client terminal, export client jaas conf file and start consumer:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
    $ ./bin/kafka-console-consumer.sh --new-consumer --zookeeper localhost:2181 --topic test-topic --from-beginning --consumer.config=config/consumer.properties  --bootstrap-server=localhost:9092
    

    If you also want to produce, do this on another terminal window:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/home/user/kafka_2.10-0.10.0.1/kafka_client_jaas.conf"
    $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config=config/producer.properties