Search code examples
apache-kafkakafka-producer-apiaws-mskkafka-transactions-api

AWS MSK Transactions Support


I'm trying to create a Kafka Producer inside a Lambda Function with Exactly-Once Delivery support enabled to push messages to MSK.

Edit: MSK IAM Auth is used for security protocol between Kafka and clients

However, even though (I think) I've setup all the configurations correctly, Producer still can't write messages to MSK.

Producer hangs upon calling init_transactions() and outputs the following debug messages in a loop:

%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)

I have tried changing the number of brokers (from 2 to 4 - did not work), playing around with values for the settings transaction.state.log.replication.factor, transaction.state.log.min.isr, offsets.topic.replication.factor (even setting them all to 1 - did not help). Advice from this thread did not help either Problems with Amazon MSK default configuration and publishing with transactions

I have the following configs and settings for AWS MSK Cluster:

  • 4 Brokers across 2 availability zones
  • Kafka 2.8.1
  • Cluster size: kafka.t3.small (I`ve checked and the same occurs on kafka.m5.large)

Cluster Configs:

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000

Producer Configs:

  • Using Confluent Kafka for Python to create the Producer
{
  "client.id": "some_id",
  "acks": "all",
  "enable.idempotence": "true",
  "transactional.id": "123",
}

P.S. If I don't use the acks, enable.idempotence and transactional.id - Producer works fine, but it beats the purpose of raising this issue in the first place.

UPD: After digging through logs of MSK brokers, it seems that the connection is not getting established - no idea why, especially because the auth method is the same for transactional and non-transactional Producerand for non-transactional Producer the connection is getting established just fine..


Solution

  • AWS IAM Auth for MSK somehow interferes with calling init_transactions(), because not using it and just using PLAINTEXT works just fine. Not sure why exactly is this not working with IAM Auth, perhaps someone else can advise. Currently, there is no way for this use case to use AWS IAM Auth.