We are trying to connect with a IAM Auth enabled MSK via Couchbase Kafka connector.
Below properties are added to $KAFKA_HOME/config/connect-standalone.properties
in addition to regular configs required.
security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
However, the connection to MSK fails with & below error message is logged repeatedly
Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected
Even after providing the auth related properties in $KAFKA_HOME/config/producer.properties
, no luck.
Can someone shed some light what additional config is required to establish the connectivity/get pass the above error?
Additional Details -
$KAFKA_HOME/bin/*.sh
So it turns out that additional properties are required in $KAFKA_HOME/config/connect-standalone.properties
with respective prefixes for Kafka producer/consumer to work.
Final connect-standalone.properties
should looks like (no changes required in producer.properties
)
# First (required for admin login)
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# For producer
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
# For consumer(optional - not required for this scenario though)
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
Explanation - As specified in docs (excerpt below; with my emphasis), we need to provide the additional properties for Kafka producers/consumers with respective prefix to be picked up by respective source/sink within the connector.
The parameters that are configured here are intended for producers and consumers used by Kafka Connect to access the configuration, offset and status topics. For configuration of the producers used by Kafka source tasks and the consumers used by Kafka sink tasks, the same parameters can be used but need to be prefixed with
producer.
andconsumer.
respectively. The only Kafka client parameter that is inherited without a prefix from the worker configuration isbootstrap.servers
, which in most cases will be sufficient, since the same cluster is often used for all purposes. A notable exception is a secured cluster, which requires extra parameters to allow connections. These parameters will need to be set up to three times in the worker configuration, once for management access, once for Kafka sources and once for Kafka sinks.