When using SASL_SCRAM-256, the Kafka consumer cannot connect to the server. Here is the consumer config and debug information,I use spring-kafka-2.1.11.RELEASE,Therefore, the version of Kafka-client is 1.0.2.I have already set up a firewall for the other party's server IP and the other party has also added my IP to the whitelist. the consumer config:
auto.commit.interval.ms = 1000
112 auto.offset.reset = earliest
113 bootstrap.servers = [114.xxx.xxx.xxx:9092]
114 check.crcs = true
115 client.id =
116 connections.max.idle.ms = 540000
117 enable.auto.commit = true
118 exclude.internal.topics = true
119 fetch.max.bytes = 52428800
120 fetch.max.wait.ms = 500
121 fetch.min.bytes = 1
122 group.id = xxx
123 heartbeat.interval.ms = 100
124 interceptor.classes = null
125 internal.leave.group.on.close = true
126 isolation.level = read_uncommitted
127 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
128 max.partition.fetch.bytes = 1048576
129 max.poll.interval.ms = 300000
130 max.poll.records = 10
131 metadata.max.age.ms = 300000
132 metric.reporters = []
133 metrics.num.samples = 2
134 metrics.recording.level = INFO
135 metrics.sample.window.ms = 30000
136 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
137 receive.buffer.bytes = 65536
138 reconnect.backoff.max.ms = 1000
139 reconnect.backoff.ms = 50
140 request.timeout.ms = 305000
141 retry.backoff.ms = 100
142 sasl.jaas.config = [hidden]
143 sasl.kerberos.kinit.cmd = /usr/bin/kinit
144 sasl.kerberos.min.time.before.relogin = 60000
145 sasl.kerberos.service.name = null
146 sasl.kerberos.ticket.renew.jitter = 0.05
147 sasl.kerberos.ticket.renew.window.factor = 0.8
148 sasl.mechanism = SCRAM-SHA-256
149 security.protocol = SASL_PLAINTEXT
150 send.buffer.bytes = 131072
151 session.timeout.ms = 10000
152 ssl.cipher.suites = null
153 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
154 ssl.endpoint.identification.algorithm = null
155 ssl.key.password = null
156 ssl.keymanager.algorithm = SunX509
157 ssl.keystore.location = null
158 ssl.keystore.password = null
159 ssl.keystore.type = JKS
160 ssl.protocol = TLS
161 ssl.provider = null
162 ssl.secure.random.implementation = null
163 ssl.trustmanager.algorithm = PKIX
164 ssl.truststore.location = null
165 ssl.truststore.password = null
166 ssl.truststore.type = JKS
167 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
debug information:
2023-12-22 23:54:58.051 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Initialize connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null) for sending metadata request
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Initiating connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null)
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Set SASL client state to SEND_APIVERSIONS_REQUEST
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Creating SaslClient: client=null;service=kafka;serviceHostname=kafka154;mechs=[SCRAM-SHA-256]
2023-12-22 23:54:58.103 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ScramSaslClient Setting SASL/SCRAM_SHA_256 client state to SEND_CLIENT_FIRST_MESSAGE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector [Consumer clientId=consumer-1, groupId=xxx] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Completed connection to node -1. Fetching API versions.
2023-12-22 23:55:02.115 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector [Consumer clientId=consumer-1, groupId=xxx] Connection with kafka154/114.xxx.xxx.xxx disconnected
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:122)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:306)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:392)
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:180)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:712)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Node -1 disconnected.
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
Any idea what could be causing this?
The issue has been identified. The egress IP of the server room is not included in the whitelist of the other party's Kafka. Although I previously requested the other party to add the whitelist once, I did not notice that the egress IP of this machine is not in the aforementioned whitelist. Because I was able to establish a TCP connection with the other party's broker, I didn't consider IP restrictions initially. In reality, the whitelist added by the other party is not a firewall-like whitelist (otherwise, the connection should not have been established). If similar issues arise in the future, it is advisable to first confirm network issues.