I'm getting this error when trying to stream from spark (using Java) to secured Kafka (with SASL PLAINTEXT mechanism).
more detailed error message:
17/07/07 14:38:43 INFO SimpleConsumer: Reconnect due to socket error: java.io.EOFException: Received -1 when reading from a channel, the socket has likely been closed.
Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at SparkStreaming.main(SparkStreaming.java:41)
Are there a specified parameters or something from kafkaParams to get spark streaming authenticated to Kafka?
back then, I added the sasl plaintext security parameter in the Kafka broker server.properties.
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
here's also my kafka_jaas_server.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin1!"
user_admin="admin1!"
user_aldys="admin1!";
};
and this is my kafka_jaas_client.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="aldys"
password="admin1!";
};
i also include my jaas server config when start the kafka broker. via editing the kafka-server-start.sh in last line to:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/kafka_jaas_server.conf kafka.Kafka "$@"
using this parameter, I can produce and consume to my topic that I set ACLs before.
this is my java code
import java.util.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
public class SparkStreaming {
public static void main(String args[]) throws Exception {
if (args.length < 2) {
System.err.println("Usage: SparkStreaming <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
" <topics> is a list of one or more kafka topics to consume from\n\n");
System.exit(1);
}
String brokers = args[0];
String topics = args[1];
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "group1");
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
SparkConf sparkConf = new SparkConf()
.setAppName("SparkStreaming")
.setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.print();
jssc.start();
jssc.awaitTermination();
}
}
also here's dependencies im using in my pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
i've been resolved my problem with following guide from https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html.
i replaced my spark-streaming-kafka_2.11 in my pom.xml to spark-streaming-kafka-0-10_2.11 and version 2.11.
based on error logs above in question. Im curious that the error throwed by SimpleConsumer, which SimpleConsumer is determined as an old consumer. Then i replace my pom dependencies as i said above, and change my code into following spark streaming integration guide above. Now i can stream into secured sasl plain kafka.