I'm using AWS MSK and I want to enable ACLs but I'm unable to create a topic when ACLs are turned-on. I'm using the command-line tools for all the operations. Here's a summary of what I'm doing:
So the issue is that the topic is getting created on Zookeeper but the broker can't access it. Presumably due to some ACL rule that I'm missing.
Raw output of the commands that I've run:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
(kafka.admin.TopicCommand$)
Running the same command again:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
(kafka.admin.TopicCommand$)
List of topics via AdminClient:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list
List of topics via Zookeeper connect:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
Here are my ACL rules:
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
What am I missing?
I don't think this has anything to do with AWS MSK, and it is rather an issue with your Kafka secured cluster configuration. Both clients (subscribers/producers) and inter-broker actions require authorization in a secured cluster. You'd have the same issue in a non-managed Kafka cluster.
The recommendation is to set up a "superuser" user (I'd call them service accounts) on the servers and then give these "superuser" users ACLs that allow the inter-broker interactions you need for your cluster. The exact ACLs you need is going to vary depending on your use cases and security preferences.
In server.properties
you'd add an entry like super.users=User:BrokerService
, and is documented at
https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser. The documentation suggests using Alice and Bob as superuser names, which seems confusing to me. Pick whatever user name makes sense for you.
Then you need to setup a similar ACL that uses a user name principal with the "superuser" user you created above e.g. principal=User:BrokerService
. The ACL would give whatever permissions the brokers need. Your immediate use case is to ALLOW READ of all topics it sounds like. You'll probably need other ACLs for inter-broker communication as well, but I can't tell you what you need exactly without more information about what you want to do.
For example this command to setup the ACL.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster
More options for setting up the ACLs and a description of your exact problem are documented here https://docs.confluent.io/current/kafka/authorization.html#acl-format
Again please research some more or edit your question if you are looking for an exact configuration to use here as there are security and use case implications on what ACLs you use.