Search code examples
apache-sparkapache-spark-sqlrddsparkcore

From the following code how to convert a JavaRDD<Integer> to DataFrame or DataSet


public static void main(String[] args) {
        SparkSession sessn = SparkSession.builder().appName("RDD2DF").master("local").getOrCreate();
        List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20);
        Dataset<Integer> DF = sessn.createDataset(lst, Encoders.INT());
        System.out.println(DF.javaRDD().getNumPartitions());
        JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());

}

From the above code, i am unable to convert the JavaRdd (mappartRdd) to DataFrame in Java Spark. I am using the below to convert JavaRdd to DataFrame/DataSet.

sessn.createDataFrame(mappartRdd, beanClass);

I tried multiple options and different overloaded functions for createDataFrame. I am facing issues to convert it to DF. what is the beanclass I need to provide for the code to work?

Unlike scala, there is no function like toDF() to convert the RDD to DataFrame in Java. can someone assist to convert it as per my requirement.

Note: I am able to create a Dataset directly by modifying the above code as below.

Dataset<Integer> mappartDS = DF.repartition(3).mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator(), Encoders.INT());

But I want to know why my JavaRdd is not getting converted to DF/DS if i use createDataFrame. Any help will be greatly appreciated.


Solution

  • This seems to be follow up of this SO Question

    I think, you are in learning stage of spark. I would suggest to understand the apis for java provided - https://spark.apache.org/docs/latest/api/java/index.html

    Regarding your question, if you check the createDataFrame api, it is as follows-

     def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
    ...
    }
    

    As you see, it takes JavaRDD[Row] and related StructType schema as args. Hence to create DataFrame which is equal to Dataset<Row> use below snippet-

    JavaRDD<Integer> mappartRdd = DF.repartition(3).javaRDD().mapPartitions(it->  Arrays.asList(JavaConversions.asScalaIterator(it).length()).iterator());
    
      StructType schema = new StructType()
                    .add(new StructField("value", DataTypes.IntegerType, true, Metadata.empty()));
            Dataset<Row> df = spark.createDataFrame(mappartRdd.map(RowFactory::create), schema);
            df.show(false);
            df.printSchema();
    
            /**
             * +-----+
             * |value|
             * +-----+
             * |6    |
             * |8    |
             * |6    |
             * +-----+
             *
             * root
             *  |-- value: integer (nullable = true)
             */