Search code examples
apache-kafkaspring-kafkasasl

Kafka consumer cannot connect to the server


When using SASL_SCRAM-256, the Kafka consumer cannot connect to the server. Here is the consumer config and debug information,I use spring-kafka-2.1.11.RELEASE,Therefore, the version of Kafka-client is 1.0.2.I have already set up a firewall for the other party's server IP and the other party has also added my IP to the whitelist. the consumer config:

auto.commit.interval.ms = 1000
112 auto.offset.reset = earliest
113 bootstrap.servers = [114.xxx.xxx.xxx:9092]
114 check.crcs = true
115 client.id =
116 connections.max.idle.ms = 540000
117 enable.auto.commit = true
118 exclude.internal.topics = true
119 fetch.max.bytes = 52428800
120 fetch.max.wait.ms = 500
121 fetch.min.bytes = 1
122 group.id = xxx
123 heartbeat.interval.ms = 100
124 interceptor.classes = null
125 internal.leave.group.on.close = true
126 isolation.level = read_uncommitted
127 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
128 max.partition.fetch.bytes = 1048576
129 max.poll.interval.ms = 300000
130 max.poll.records = 10
131 metadata.max.age.ms = 300000
132 metric.reporters = []
133 metrics.num.samples = 2
134 metrics.recording.level = INFO
135 metrics.sample.window.ms = 30000
136 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
137 receive.buffer.bytes = 65536
138 reconnect.backoff.max.ms = 1000
139 reconnect.backoff.ms = 50
140 request.timeout.ms = 305000
141 retry.backoff.ms = 100
142 sasl.jaas.config = [hidden]
143 sasl.kerberos.kinit.cmd = /usr/bin/kinit
144 sasl.kerberos.min.time.before.relogin = 60000
145 sasl.kerberos.service.name = null
146 sasl.kerberos.ticket.renew.jitter = 0.05
147 sasl.kerberos.ticket.renew.window.factor = 0.8
148 sasl.mechanism = SCRAM-SHA-256
149 security.protocol = SASL_PLAINTEXT
150 send.buffer.bytes = 131072
151 session.timeout.ms = 10000
152 ssl.cipher.suites = null
153 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
154 ssl.endpoint.identification.algorithm = null
155 ssl.key.password = null
156 ssl.keymanager.algorithm = SunX509
157 ssl.keystore.location = null
158 ssl.keystore.password = null
159 ssl.keystore.type = JKS
160 ssl.protocol = TLS
161 ssl.provider = null
162 ssl.secure.random.implementation = null
163 ssl.trustmanager.algorithm = PKIX
164 ssl.truststore.location = null
165 ssl.truststore.password = null
166 ssl.truststore.type = JKS
167 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

debug information:

2023-12-22 23:54:58.051 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Initialize connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null) for sending metadata request
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Initiating connection to node 114.xxx.xxx.xxx:9092 (id: -1 rack: null)
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Set SASL client state to SEND_APIVERSIONS_REQUEST
2023-12-22 23:54:58.102 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Creating SaslClient: client=null;service=kafka;serviceHostname=kafka154;mechs=[SCRAM-SHA-256]
2023-12-22 23:54:58.103 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 ScramSaslClient  Setting SASL/SCRAM_SHA_256 client state to SEND_CLIENT_FIRST_MESSAGE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector  [Consumer clientId=consumer-1, groupId=xxx] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 SaslClientAuthenticator  Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2023-12-22 23:54:58.109 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Completed connection to node -1. Fetching API versions.
2023-12-22 23:55:02.115 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 Selector  [Consumer clientId=consumer-1, groupId=xxx] Connection with kafka154/114.xxx.xxx.xxx disconnected
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:122)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:306)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:392)
        at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:180)
        at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:712)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Node -1 disconnected.
2023-12-22 23:55:02.116 - - [DEBUG] org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 NetworkClient  [Consumer clientId=consumer-1, groupId=xxx] Give up sending metadata request since no node is available

Any idea what could be causing this?


Solution

  • The issue has been identified. The egress IP of the server room is not included in the whitelist of the other party's Kafka. Although I previously requested the other party to add the whitelist once, I did not notice that the egress IP of this machine is not in the aforementioned whitelist. Because I was able to establish a TCP connection with the other party's broker, I didn't consider IP restrictions initially. In reality, the whitelist added by the other party is not a firewall-like whitelist (otherwise, the connection should not have been established). If similar issues arise in the future, it is advisable to first confirm network issues.