Search code examples
javaapache-sparkcassandraspark-cassandra-connector

Can not union two CassandraJavaRDD<CassandraRow> in Spark


As there is a limit to query the data from Cassandra, I'm trying to read the data batch by batch using Spark and storing it in a RDD.

And then I'm adding all the RDD , using union function.

Here is my code .

private void getDataFromCassandra(JavaSparkContext sc) {


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
    CassandraJavaRDD<CassandraRow> cassandraRDD2  = null;

    While(Some Condition)

     cassandraRDD = CassandraJavaUtil
                .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
                .where("pid IN ('" + sb + "')");

    if(cassandraRDD2==null){


     cassandraRDD2=cassandraRDD;
    }
    else{
        cassandraRDD2 =  cassandraRDD2.union(cassandraRDD);
    }
}             

}

But in the union I'm getting the following error.

Type mismatch: cannot convert from JavaRDD to CassandraJavaRDD

Though the Both the RDD's is of similar type.

So 1) shall I apply a Cast as

 cassandraRDD2 =  (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);

2) Or Change the Type of one of the RDD to JavaRDD


Solution

  • The problem happens because according to the docs:

    Method: union(JavaRDD other) Return the union of this RDD and another one.

    Return Value: JavaRDD

    And therefore the mismatch.

    Because according to this:

    public class CassandraJavaRDD<R> extends JavaRDD<R> {
    ...
    }
    

    The CassandraJavaRDD class extends JavaRDD so you can use:

    JavaRDD<CassandraRow> cassandraRDD = null;
    JavaRDD<CassandraRow> cassandraRDD2 = null;
    

    and therefore the return value of the union() method will match its type.