Search code examples
javaapache-sparkapache-kafkaspark-streaming

Why kafka throw Received -1 when reading from channel, socket has likely been closed, when spark streaming to secured kafka?


I'm getting this error when trying to stream from spark (using Java) to secured Kafka (with SASL PLAINTEXT mechanism).

more detailed error message:

17/07/07 14:38:43 INFO SimpleConsumer: Reconnect due to socket error: java.io.EOFException: Received -1 when reading from a channel, the socket has likely been closed.
Exception in thread "main" org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:98)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at SparkStreaming.main(SparkStreaming.java:41)

Are there a specified parameters or something from kafkaParams to get spark streaming authenticated to Kafka?

back then, I added the sasl plaintext security parameter in the Kafka broker server.properties.

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin

here's also my kafka_jaas_server.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin1!"
    user_admin="admin1!"
    user_aldys="admin1!";
};

and this is my kafka_jaas_client.conf

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="aldys"
    password="admin1!";
};

i also include my jaas server config when start the kafka broker. via editing the kafka-server-start.sh in last line to:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/kafka_jaas_server.conf kafka.Kafka "$@"

using this parameter, I can produce and consume to my topic that I set ACLs before.

this is my java code

import java.util.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkStreaming {

    public static void main(String args[]) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: SparkStreaming <brokers> <topics>\n" +
                "  <brokers> is a list of one or more Kafka brokers\n" +
                "  <topics> is a list of one or more kafka topics to consume from\n\n");
            System.exit(1);
        }

        String brokers = args[0];
        String topics = args[1];

        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("auto.offset.reset", "smallest");
        kafkaParams.put("security.protocol", "SASL_PLAINTEXT");

        SparkConf sparkConf = new SparkConf()
                            .setAppName("SparkStreaming")
                            .setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet
        );

        messages.print();

        jssc.start();
        jssc.awaitTermination();
    }
}

also here's dependencies im using in my pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.11</artifactId>
        <version>1.6.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.10.2.1</version>
    </dependency>
</dependencies>

Solution

  • i've been resolved my problem with following guide from https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html.

    i replaced my spark-streaming-kafka_2.11 in my pom.xml to spark-streaming-kafka-0-10_2.11 and version 2.11.

    based on error logs above in question. Im curious that the error throwed by SimpleConsumer, which SimpleConsumer is determined as an old consumer. Then i replace my pom dependencies as i said above, and change my code into following spark streaming integration guide above. Now i can stream into secured sasl plain kafka.