currently, I am using the PlainLoginModule to authenticate users. However, I now created a jar with the code listed here and want to use that instead of PlainLoginModule: https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers#KIP-86:ConfigurableSASLcallbackhandlers-sample_plainSampleCallbackHandlerforSASL/PLAIN.
I have placed the jar file into the ~/libs folder and added listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.synopsys.demo.DemoApplication
to my server.properties and my kafka_server_jaas.conf into:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
And when I start up my server, I get the error:
Part 1:
14:36:41.924 [main] DEBUG org.apache.kafka.common.network.Selector - [KafkaServer id=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Successfully authenticated with swe-analyticsdb-prod2/10.15.164.233
14:36:41.924 [Controller-1-to-broker-1-send-thread] INFO kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1 connected to swe-analyticsdb-prod2:9093 (id: 1 rack: null) for sending state change requests
14:36:41.925 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-1] DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=1] Connection with swe-analyticsdb-prod2.internal.synopsys.com/10.15.164.233 disconnected
java.io.EOFException: null
>---at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:573)
>---at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
>---at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>---at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>---at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at kafka.network.Processor.poll(SocketServer.scala:830)
>---at kafka.network.Processor.run(SocketServer.scala:730)
>---at java.lang.Thread.run(Thread.java:748)
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-reauthentication:
14:36:41.925 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name successful-authentication-no-reauth:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-authentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name failed-reauthentication:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name reauthentication-latency:
14:36:41.926 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
14:36:41.927 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
14:36:41.928 [main] WARN kafka.utils.CoreUtils$ - org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
java.lang.NoSuchMethodError: org.apache.kafka.common.requests.ControlledShutdownRequest$Builder.<init>(IJS)V
>---at kafka.server.KafkaServer.doControlledShutdown$1(KafkaServer.scala:520)
>---at kafka.server.KafkaServer.controlledShutdown(KafkaServer.scala:563)
>---at kafka.server.KafkaServer.$anonfun$shutdown$2(KafkaServer.scala:585)
>---at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
>---at kafka.server.KafkaServer.shutdown(KafkaServer.scala:585)
>---at kafka.server.KafkaServer.startup(KafkaServer.scala:342)
>---at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
>---at kafka.Kafka$.main(Kafka.scala:75)
>---at kafka.Kafka.main(Kafka.scala)
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutting down
14:36:41.929 [/config/changes-event-process-thread] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Stopped
14:36:41.929 [main] INFO kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread - [/config/changes-event-process-thread]: Shutdown completed
14:36:41.930 [main] INFO kafka.network.SocketServer - [SocketServer brokerId=1] Stopping socket server request processors
14:36:41.931 [data-plane-kafka-socket-acceptor-ListenerName(SASL_SSL)-SASL_SSL-9093] DEBUG kafka.network.Acceptor - Closing server socket and selector.
14:36:41.933 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector - processor 0
14:36:41.934 [data-plane-kafka-network-thread-1-ListenerName(SASL_SSL)-SASL_SSL-0] DEBUG kafka.network.Processor - Closing selector connection 10.15.164.233:9093-10.15.164.233:44774-0
14:36:41.935 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
Part 2:
07:22:21.223 [main] DEBUG kafka.utils.KafkaScheduler - Shutting down task scheduler.
07:22:21.223 [main] INFO kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper - [ExpirationReaper-1-Heartbeat]: Shutting down
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.254 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.255 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.255 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.
07:22:21.256 [Controller-1-to-broker-1-send-thread] WARN kafka.controller.RequestSendThread - [RequestSendThread controllerId=1] Controller 1's connection to broker swe-analyticsdb-prod2:9093 (id: 1 rack: null) was unsuccessful
java.io.IOException: Connection to swe-analyticsdb-prod2:9093 (id: 1 rack: null) failed.
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Initiating connection to node swe-analyticsdb-prod2:9093 (id: 1 rack: null) using address swe-analyticsdb-prod2/10.15.164.233
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Set SASL client state to SEND_APIVERSIONS_REQUEST
07:22:21.356 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator - Creating SaslClient: client=null;service=kafka;serviceHostname=swe-analyticsdb-prod2;mechs=[PLAIN]
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.common.network.Selector - [Controller id=1, targetBrokerId=1] Connection with swe-analyticsdb-prod2/10.15.164.233 disconnected
java.net.ConnectException: Connection refused
>---at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>---at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>---at org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:119)
>---at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
>---at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
>---at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>---at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
>---at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
>---at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:282)
>---at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:236)
>---at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
07:22:21.357 [Controller-1-to-broker-1-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Node 1 disconnected.
07:22:21.357 [Controller-1-to-broker-1-send-thread] WARN org.apache.kafka.clients.NetworkClient - [Controller id=1, targetBrokerId=1] Connection to node 1 (swe-analyticsdb-prod2/10.15.164.233:9093) could not be established. Broker may not be available.
UPDATE I notice this behavior occurring even if I don't use the jar/class I made, but by just leave it inside the "../libs" directory. The error above will always occur, using built in or custom AuthenticateCallBackHandler classes.
Am I missing a step/steps? I know I have to add the jar to Kafka, so it can recognize and use it but I don't see any tutorials/documentation that explains how to use a custom call back handler with PLAIN. Anyone know how to do this?
I am using Kafka 2.2
My custom class code:
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback;
import kafka.common.KafkaException;
import javax.naming.AuthenticationNotSupportedException;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.IOException;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
public class CustomCallback implements AuthenticateCallbackHandler {
@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
}
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
String username = null;
for (Callback callback: callbacks) {
if (callback instanceof NameCallback)
username = ((NameCallback) callback).getDefaultName();
else if (callback instanceof PlainAuthenticateCallback) {
PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
boolean authenticated = authenticate(username, plainCallback.password());
plainCallback.authenticated(authenticated);
} else
throw new UnsupportedCallbackException(callback);
}
}
protected boolean authenticate(String username, char[] password) throws IOException {
if (username == null)
return false;
else {
// Return true if password matches expected password
Hashtable<String, String> environment = new Hashtable<String, String>();
System.out.println("Custom class is being called");
environment.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
environment.put(Context.PROVIDER_URL, "ldap://adldap.internal.synopsys.com:389");
environment.put(Context.SECURITY_AUTHENTICATION, "simple");
environment.put(Context.SECURITY_PRINCIPAL, "CN=" + username+",CN=Users,DC=internal,DC=synopsys,DC=com");
environment.put(Context.SECURITY_CREDENTIALS, new String(password));
try
{
DirContext context = new InitialDirContext(environment);
context.getEnvironment();
context.close();
return true;
}
catch (AuthenticationNotSupportedException exception)
{
System.out.println("The authentication is not supported by the server");
return false;
}
catch (AuthenticationException exception)
{
System.out.println("Incorrect password or username");
return false;
}
catch (NamingException exception)
{
System.out.println("Error when trying to create the context");
return false;
}
}
}
@Override
public void close() throws KafkaException {
}
public static void main(String[] args) throws IOException {
char[] pass = new char[]{'P', '0', 'm', 'e', 'l', '0', '2', '0', '1', '9', '!'};
CustomCallback test = new CustomCallback();
System.out.println(test.authenticate("<username>",pass));
System.out.println(test.getClass().getName());
//SpringApplication.run(DemoApplication.class, args);
}
}
server.properties contents:
advertised.listeners=SASL_SSL://<machine name>:9093
ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required
ssl.truststore.location=/remote/sde108/kafka/kafka/SSL2/client/server.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=/remote/sde108/kafka/kafka/SSL2/client/server.keystore.jks
ssl.keystore.password=password
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
zookeeper.set.acl=false
listeners=SASL_SSL://<machine name>:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
offsets.retention.minutes=1
#listener.name.sasl_sasl.plain.sasl.server.callback.handler.class=<package name>.CustomCallbackApplication
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0
<groupId>synopsys</groupId>
<artifactId>synopsys</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
the meta-inf was made using project structure and build artifacts
I think I reply you email two days ago. I custom SASL/PLAIN authentication mechanism by storing username/password in mysql instead file. I also find that KIP-86 very confusing because it provides different ways to do the same thing and do not tell the differences between them.
This is what I do and what works.
The interface I implemented is AuthenticateCallbackHandler
The generated jar should not be placed under ~/libs. There is a lib subdirectory where you install your Kafka.
I did not modify kafka_server_jaas.conf