Search code examples
amazon-web-servicesapache-kafkaapache-kafka-connectaws-msk

Unable to connect with MSK via Couchbase-Kafka connector


We are trying to connect with a IAM Auth enabled MSK via Couchbase Kafka connector.

Below properties are added to $KAFKA_HOME/config/connect-standalone.properties in addition to regular configs required.

security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

However, the connection to MSK fails with & below error message is logged repeatedly

Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected

Even after providing the auth related properties in $KAFKA_HOME/config/producer.properties, no luck.

Can someone shed some light what additional config is required to establish the connectivity/get pass the above error?

Additional Details -

  • Verified the EC2 is in same VPC as of hosted MSK
  • Able to telnet to MSK & Couchbase server
  • Able to create, alter, produce/consume etc. on MSK's topic via shell scripts in $KAFKA_HOME/bin/*.sh
  • Connector version - 4.1
  • Kafka Version (for standalone execution) - 2.13_3.2.1
  • AWS IAM Auth jar is on classpath & picked correctly
  • MSK Kafka Version - 2.6.2

Solution

  • So it turns out that additional properties are required in $KAFKA_HOME/config/connect-standalone.properties with respective prefixes for Kafka producer/consumer to work.

    Final connect-standalone.properties should looks like (no changes required in producer.properties)

    # First (required for admin login)
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    # For producer
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=AWS_MSK_IAM
    producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
    # For consumer(optional - not required for this scenario though)
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=AWS_MSK_IAM
    consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    

    Explanation - As specified in docs (excerpt below; with my emphasis), we need to provide the additional properties for Kafka producers/consumers with respective prefix to be picked up by respective source/sink within the connector.

    The parameters that are configured here are intended for producers and consumers used by Kafka Connect to access the configuration, offset and status topics. For configuration of the producers used by Kafka source tasks and the consumers used by Kafka sink tasks, the same parameters can be used but need to be prefixed with producer. and consumer. respectively. The only Kafka client parameter that is inherited without a prefix from the worker configuration is bootstrap.servers, which in most cases will be sufficient, since the same cluster is often used for all purposes. A notable exception is a secured cluster, which requires extra parameters to allow connections. These parameters will need to be set up to three times in the worker configuration, once for management access, once for Kafka sources and once for Kafka sinks.