Search code examples
javaapache-sparkspark-streamingdstreamjava-pair-rdd

How to generate JavaPairInputDStream from JavaStreamingContext?


I am learning Apache Spark streaming and tried to generate JavaPairInputDStream from JavaStreamingContext. Below is my code:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
.......    
.......

SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(3));

List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));

JavaPairRDD<String, String> pairs1 = jssc.sparkContext().parallelizePairs(data1);

List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>();
data2.add(new Tuple2<String, Integer>("K1", 123));
data2.add(new Tuple2<String, Integer>("K2", 456));
data2.add(new Tuple2<String, Integer>("K7", 0));

JavaPairRDD<String, String> pairs2 = jssc.sparkContext().parallelizePairs(data1);

Queue<JavaPairRDD<String, String>> inputQueue = new LinkedList<>(Arrays.asList(pairs1, pairs2));

JavaPairInputDStream<String, String> lines = jssc.queueStream(inputQueue, true);

But the last line of my application throws this exception:

The method queueStream(Queue<JavaRDD<T>>, boolean) in the type JavaStreamingContext is not applicable for the arguments (Queue<JavaPairRDD<String,String>>, boolean)

I have no idea how to generate JavaPairInputDStream with JavaStreamingContext.


Solution

  • If you check the API for queueStream method of JavaStreamingContext class, it acceptsjava.util.Queue<JavaRDD<T>> as queue parameter. I modified your program to get Queue<JavaRDD<T> queue. The queueStreammethod returns JavaInputDStream<T> type and here is how you can transform it to JavaPairDStream<String,String>. JavaPairDStream class is the super class of JavaPairInputDStream class. Hope this helps.

    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Queue;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    import scala.Tuple2;
    
    public class SparkStreamTest {
        public static void main(String[] args) throws Exception {
            SparkConf sc = new SparkConf().setAppName("SparkStreamTest").setMaster("local[*]");;
            JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(5));
            //first data list
            List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
            data1.add(new Tuple2<String, String>("K1", "ABC"));
            data1.add(new Tuple2<String, String>("K2", "DE"));
            data1.add(new Tuple2<String, String>("K1", "F"));
            data1.add(new Tuple2<String, String>("K3", "GHI"));
            //javaRDD1
            JavaRDD<Tuple2<String, String>> javaRDD1 = jssc.sparkContext().parallelize(data1);
            //second data list
            List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
            data2.add(new Tuple2<String, String>("K1", "123"));
            data2.add(new Tuple2<String, String>("K2", "256"));
            data2.add(new Tuple2<String, String>("K7", "0"));
            //javaRDD2
            JavaRDD<Tuple2<String, String>> javaRDD2 = jssc.sparkContext().parallelize(data2);
            //Queue
            Queue<JavaRDD<Tuple2<String, String>>> inputQueue = new LinkedList<JavaRDD<Tuple2<String, String>>>();
            inputQueue.add(javaRDD1);
            inputQueue.add(javaRDD2);
            //stream
            JavaInputDStream<Tuple2<String, String>> javaDStream = jssc.queueStream(inputQueue, true);
            JavaPairDStream<String,String> javaPairDStream = javaDStream.mapToPair(tuple -> new Tuple2(tuple._1().toLowerCase(),tuple._2()));
            //print
            javaPairDStream.print();
            //start
            jssc.start();
            jssc.awaitTermination();
        }
    }