Search code examples
c#amazon-iamaws-mskaws-msk-connect

How to connect to AWS MSK Cluster from C# using Identity and Access Management (IAM) authorization


I have an AWS MSK (Kafka) Broker. It has been setup for public access authenticating with IAM. I know the broker settings are correct, I have been using https://github.com/aws/aws-msk-iam-auth to successfully connect to the broker as both a consumer and producer written in Java. However I also have a requirement to write a C# consumer. Ideally, I would like to find a C# library that is equivalent to the java aws-msk-iam-auth library (probably some nuGet package). I can't find any such library or even any way to connect to an AWS MSK Broker using IAM authentication in C#. So my question: How do I connect to an AWS MSK Broker using IAM authentication from C#?


Solution

  • You can now do this using the AWS.MSK.Auth package.

    Here is the C# code from that page:

       var producerConfig = new ProducerConfig
       {
           BootstrapServers = < BOOTSTRAP - SERVER - HERE >,
           SecurityProtocol = SecurityProtocol.SaslSsl,
           SaslMechanism = SaslMechanism.OAuthBearer
       };
    
       AWSMSKAuthTokenGenerator mskAuthTokenGenerator = new AWSMSKAuthTokenGenerator();
    
       //Callback to handle OAuth bearer token refresh. It fetches the OAUTH Token from the AWSMSKAuthTokenGenerator class. 
       void OauthCallback(IClient client, string cfg)
       {
           try
           {
               var (token, expiryMs) = await mskAuthTokenGenerator.GenerateAuthTokenAsync(Amazon.RegionEndpoint.USEast1);
               client.OAuthBearerSetToken(token, expiryMs, "DummyPrincipal");
           }
           catch (Exception e)
           {
               client.OAuthBearerSetTokenFailure(e.ToString());
           }
       }
    
       var producer = new ProducerBuilder<string, string>(producerConfig)
                           .SetOAuthBearerTokenRefreshHandler(OauthCallback).Build();
               try
               {
                   var deliveryReport = await producer.ProduceAsync("test-topic", new Message<string, string> { Value = "Hello from .NET" });
    
                   Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}");
               }
               catch (ProduceException<string, string> e)
               {
                   Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
               }
    

    Here is an F# version:

    #r "nuget: AWS.MSK.Auth, 1.0.0"
    #r "nuget: Confluent.Kafka, 2.4.0"
    
    open Amazon
    open AWS.MSK.Auth
    open Confluent.Kafka
    
    let mskAuthTokenGenerator = new AWSMSKAuthTokenGenerator()
    
    let oauthCallback (client : IClient) (_ : string) : unit =
      task {
        try
          let! token, expiryMs = mskAuthTokenGenerator.GenerateAuthTokenAsync(RegionEndpoint.USEast1)
    
          client.OAuthBearerSetToken(token, expiryMs, "DummyPrincipal")
        with exn ->
          client.OAuthBearerSetTokenFailure(string exn)
      }
      |> ignore
    
    let adminClient =
      let config = ClientConfig()
      let adminConfig = AdminClientConfig(config)
      adminConfig.BootstrapServers <- "a.cluster.abcdef.a1.kafka.us-east-1.amazonaws.com:9098"
      adminConfig.SecurityProtocol <- SecurityProtocol.SaslSsl
      adminConfig.SaslMechanism <- SaslMechanism.OAuthBearer
      let builder = AdminClientBuilder(adminConfig)
      builder.SetOAuthBearerTokenRefreshHandler(oauthCallback) |> ignore
      builder.Build()
    
    let topics = TopicCollection.OfTopicNames([ "my_topic" ])
    
    let result = adminClient.DescribeTopicsAsync(topics).Result
    
    for desc in result.TopicDescriptions do
      printfn $"%A{desc}"