I have an AWS MSK (Kafka) Broker. It has been setup for public access authenticating with IAM. I know the broker settings are correct, I have been using https://github.com/aws/aws-msk-iam-auth to successfully connect to the broker as both a consumer and producer written in Java. However I also have a requirement to write a C# consumer. Ideally, I would like to find a C# library that is equivalent to the java aws-msk-iam-auth library (probably some nuGet package). I can't find any such library or even any way to connect to an AWS MSK Broker using IAM authentication in C#. So my question: How do I connect to an AWS MSK Broker using IAM authentication from C#?
You can now do this using the AWS.MSK.Auth
package.
Here is the C# code from that page:
var producerConfig = new ProducerConfig
{
BootstrapServers = < BOOTSTRAP - SERVER - HERE >,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.OAuthBearer
};
AWSMSKAuthTokenGenerator mskAuthTokenGenerator = new AWSMSKAuthTokenGenerator();
//Callback to handle OAuth bearer token refresh. It fetches the OAUTH Token from the AWSMSKAuthTokenGenerator class.
void OauthCallback(IClient client, string cfg)
{
try
{
var (token, expiryMs) = await mskAuthTokenGenerator.GenerateAuthTokenAsync(Amazon.RegionEndpoint.USEast1);
client.OAuthBearerSetToken(token, expiryMs, "DummyPrincipal");
}
catch (Exception e)
{
client.OAuthBearerSetTokenFailure(e.ToString());
}
}
var producer = new ProducerBuilder<string, string>(producerConfig)
.SetOAuthBearerTokenRefreshHandler(OauthCallback).Build();
try
{
var deliveryReport = await producer.ProduceAsync("test-topic", new Message<string, string> { Value = "Hello from .NET" });
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
}
Here is an F# version:
#r "nuget: AWS.MSK.Auth, 1.0.0"
#r "nuget: Confluent.Kafka, 2.4.0"
open Amazon
open AWS.MSK.Auth
open Confluent.Kafka
let mskAuthTokenGenerator = new AWSMSKAuthTokenGenerator()
let oauthCallback (client : IClient) (_ : string) : unit =
task {
try
let! token, expiryMs = mskAuthTokenGenerator.GenerateAuthTokenAsync(RegionEndpoint.USEast1)
client.OAuthBearerSetToken(token, expiryMs, "DummyPrincipal")
with exn ->
client.OAuthBearerSetTokenFailure(string exn)
}
|> ignore
let adminClient =
let config = ClientConfig()
let adminConfig = AdminClientConfig(config)
adminConfig.BootstrapServers <- "a.cluster.abcdef.a1.kafka.us-east-1.amazonaws.com:9098"
adminConfig.SecurityProtocol <- SecurityProtocol.SaslSsl
adminConfig.SaslMechanism <- SaslMechanism.OAuthBearer
let builder = AdminClientBuilder(adminConfig)
builder.SetOAuthBearerTokenRefreshHandler(oauthCallback) |> ignore
builder.Build()
let topics = TopicCollection.OfTopicNames([ "my_topic" ])
let result = adminClient.DescribeTopicsAsync(topics).Result
for desc in result.TopicDescriptions do
printfn $"%A{desc}"