Search code examples
apache-spark-sqlcassandracassandra-3.0spark-cassandra-connectorapache-spark-1.6

CassandraSourceRelation not serializable when joining two dataframes


I have a setup of dataframes with spark-cassandra-connector 1.6.2. I try to perform some transformations with cassandra. Datastax enterprise version is 5.0.5.

DataFrame df1 =  sparkContext
            .read().format("org.apache.spark.sql.cassandra")
            .options(readOptions).load()
            .where("field2 ='XX'")
            .limit(limitVal)
            .repartition(partitions);

List<String> distinctKeys = df1.getColumn("field3").collect();  

values = some transformations to get IN query values;

String cassandraQuery = String.format("SELECT * FROM "
            + "table2 "
            + "WHERE field2 = 'XX' "
            + "AND field3 IN (%s)", values);
DataFrame df2 = sparkContext.cassandraSql(cassandraQuery);

String column1 = "field3";
String column2 = "field4";
List<String> columns = new ArrayList<>();
        columns.add(column1);
        columns.add(column2);
scala.collection.Seq<String> usingColumns = 
scala.collection.JavaConverters.
collectionAsScalaIterableConverter(columns).asScala().toSeq();
DataFrame joined = df1.join(df2, usingColumns, "left_outer");

List<Row> collected = joined.collectAsList(); // doestn't work
Long count = joined.count(); // works

This is the exception log, looks like spark is creating cassandra source realation, and it cannot be serialized.

java.io.NotSerializableException: java.util.ArrayList$Itr
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.cassandra.CassandraSourceRelation, value:  
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496)
- field (class: org.apache.spark.sql.execution.datasources.LogicalRelation, 
name: relation, type: class org.apache.spark.sql.sources.BaseRelation)
- object (class org.apache.spark.sql.execution.datasources.LogicalRelation, 
Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496 
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Filter, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Filter, Filter 
(field2#0 = XX)
+- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Repartition, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Repartition, 
Repartition 4, true
+- Filter (field2#0 = XX)
+- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Join, name: left, 
type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Join, Join 
LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
:  +- Filter (field2#0 = XX)
:     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
+- Project [fields]
+- Filter ((field2#17 = YY) && field3#18 IN (IN array))
  +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7172525e
)
- field (class: org.apache.spark.sql.catalyst.plans.logical.Project, name: 
child, type: class org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)
- object (class org.apache.spark.sql.catalyst.plans.logical.Project, Project 
[fields]
+- Join LeftOuter, Some(((field3#2 = field3#18) && (field4#3 = field4#20)))
:- Repartition 4, true
:  +- Filter (field2#0 = XX)
:     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@1c11a496
+- Project [fields]
  +- Filter ((field2#17 = XX) && field3#18 IN (IN array))
     +- Relation[fields] 
org.apache.spark.sql.cassandra.CassandraSourceRelation@7172525e
)
- field (class: org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, name: 
$outer, type: class org.apache.spark.sql.catalyst.trees.TreeNode)
- object (class org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4, 
<function1>)
- field (class: 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
name: $outer, type: class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4)
- object (class 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$9, 
<function1>)
- field (class: scala.collection.immutable.Stream$$anonfun$map$1, name: f$1, 
type: interface scala.Function1)
- object (class scala.collection.immutable.Stream$$anonfun$map$1, <function0>)
- writeObject data (class: scala.collection.immutable.$colon$colon)
- object (class scala.collection.immutable.$colon$colon, 
List(org.apache.spark.OneToOneDependency@17f43f4a))
- field (class: org.apache.spark.rdd.RDD, name: 
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[32] at 
collectAsList at RevisionPushJob.java:308)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: $outer, 
type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: 
$outer, type: class org.apache.spark.rdd.RDD$$anonfun$collect$1)
- object (class org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, 
<function1>)

Is it possible to make it serialized? Why is count operation working but collect operation does not?

UPDATE:

After getting back to it, it turned out that in Java you have first convert the Java Iterable to scala buffer and create a scala Iterable -> Seq out of it. Otherwise it doesn't work. Thanks Russel for bringing my attention to the cause of the problem.

String attrColumn1 = "column1";
            String attrColumn2 = "column2";
            String attrColumn3 = "column3";
            String attrColumn4 = "column4";
            List<String> attrColumns = new ArrayList<>();
            attrColumns.add(attrColumn1);
            attrColumns.add(attrColumn2);
            attrColumns.add(attrColumn3);
            attrColumns.add(attrColumn4);
            Seq<String> usingAttrColumns = 
JavaConverters.asScalaBufferConverter(attrColumns).asScala().toList();

Solution

  • See the error message pointing to java.util.ArrayList$Itr being your unserialzable bit which I think may be a reference to

     List<String> columns = new ArrayList<>();
        columns.add(column1);
        columns.add(column2);
    

    Which in it's implicit conversion may require the serialization of the array-list iterator? That's the only ArrayList I see so it may be the culprit. It may also be in the code you removed for "values."

    When you do the Count it can discard column information so that probably saves you but I can't be sure.

    So TLDR my suggestion is trying to remove things from the code and replacing and building your code back up again to find the unserializable bits.