Search code examples
node.jsapache-kafkasaslconfluent-cloud

NodeJS connect to Confluent Cloud Kafka using openID with SASL extension


I'm trying to connect to a Confluent Cloud Kafka server which is configured with OpenID identity provider using NodeJS.

see: https://docs.confluent.io/cloud/current/access-management/authenticate/oauth/configure-clients-oauth.html#client-login-callback-handler-top-level-configuration-options

I'm able to connect when using the kafka-console-consumer command (like the confluent cloud example)

# ./kafka.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId="MY_CLIENT_ID" \
    clientSecret="MY_CLIETN_SECRET" \
    scope="SCOPE" \
    extension_logicalCluster="CLUSTER_ID" \
    extension_identityPoolId="IDENTITY_POOL_ID" ;
kafka-console-consumer \
        --bootstrap-server KAFKA_BROKER \
        --consumer.config ./kafka.properties \
        --topic MY_TOPIC \
        --group GROUP_NAME

Currently I'm following KafkaJS oauthbearer example https://kafka.js.org/docs/configuration#oauthbearer-example but I'm getting lost when trying to introduce SASL extension to the mix (i.e extension_logicalCluster and extension_identityPoolId) .

My code so far

# oauthBearerProviderOpenId.ts
import { Issuer, TokenSet } from 'openid-client';

export interface oauthBearerProviderOpenIdOptions {
    issuer: string;
    clientId: string;
    clientSecret: string;
    refreshThresholdMs: number;
    scope: string,
    logicalCluster: string,
    identityPoolId: string,
}

export const oauthBearerProviderOpenId = async (options: oauthBearerProviderOpenIdOptions) => {
    const issuer = await Issuer.discover(options.issuer);
    const { Client } = issuer;

    const client = new Client({
        client_id: options.clientId,
        client_secret: options.clientSecret,
    });

    let tokenPromise: Promise<string>;
    let tokenSet: TokenSet | null;

    async function refreshToken() {
        try {
            if (tokenSet == null) {
                tokenSet = await client.grant({
                    grant_type: 'client_credentials',
                    scope: options.scope,
                });
            }

            setTimeout(() => {
                tokenPromise = refreshToken()
            }, tokenSet!.expires_in);

            if(!tokenSet.access_token) {
                throw new Error('Unable to fetch access_token');
            }
            
            return tokenSet.access_token;

        } catch (error) {
            const e = error as any;
            tokenSet = null;
            console.error(e.data.payload.toString());
            throw error;
        }
    }

    tokenPromise = refreshToken();

    return async function () {
        return {
            value: await tokenPromise
        }
    }
};

ERROR LOG

SASL OAUTHBEARER authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY
KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY

Solution

  • After digging into KafkaJS code I found out KafkaJS dose support SASL extensions. (see: https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/src/protocol/sasl/oauthBearer/request.js#LL50C53-L50C53)

    To pass the SASL extensions use the following code (Based on KafkaJS oauthBearerProvider example https://kafka.js.org/docs/configuration#oauthbearer-example).

    import { AccessToken, ClientCredentials } from "simple-oauth2";
    
    export interface OauthBearerProviderOptions {
      clientId: string;
      clientSecret: string;
      host: string;
      path: string;
      refreshThresholdMs: number;
      scope?: string,
      saslExtension?: { [key: string]: string }
    }
    
    export const oauthBearerProvider = (options: OauthBearerProviderOptions) => {
      const client = new ClientCredentials({
        client: {
          id: options.clientId,
          secret: options.clientSecret
        },
        auth: {
          tokenHost: options.host,
          tokenPath: options.path
        },
        options: {
          authorizationMethod: 'body',
        },
      });
    
      let tokenPromise: Promise<string>;
      let accessToken: AccessToken | null;
    
      async function refreshToken() {
        try {
          if (accessToken == null) {
            accessToken = await client.getToken({
              scope: options.scope,
            });
          }
    
          if (accessToken.expired(options.refreshThresholdMs / 1000)) {
            accessToken = await accessToken.refresh()
          }
    
          const nextRefresh = (accessToken.token.expires_in as number) * 1000 - options.refreshThresholdMs;
          setTimeout(() => {
            tokenPromise = refreshToken()
          }, nextRefresh);
    
          return accessToken.token.access_token as string;
        } catch (error) {
          const e = error as any;
          accessToken = null;
          console.error(e.data.payload.toString());
          throw error;
        }
      }
    
      tokenPromise = refreshToken();
    
      return async function () {
        return {
          value: await tokenPromise,
          extensions: options.saslExtension // <--- pass sasl extension as key value
        }
      }
    };
    
    

    sasl extensions object example

    {
       "logicalCluster": "<CLUSTER_ID>",
       "identityPoolId": "<IDENTITY_POOL_ID>"
    }