Search code examples
scalaapache-sparkcassandraapache-spark-sqlspark-cassandra-connector

In Scala, What is correct way to filter Spark Cassandra RDD by a List[String]?


I have a list of ids in string format, this list can be roughly 20,000 ids long:

var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;

When I use this list against one of my cassandra tables it works just fine, no matter the size of timelineIdsString:

var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
        .select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
    timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)

When I do it against another of my tables, it never completes when I have over 1000 ids in the List:

var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
            .select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
    dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);

Both cassandra tables have the key as the timelineid so I know that its correct. This code works fine as long as timelineids is a small list.

Is there a better way to filter from cassandra RDD? Is it the size of the IN clause causing it to choke?


Solution

  • Instead of performing join on Spark level it's better to perform join using Cassandra itself - in this case you'll read from Cassandra only the necessary data (given that join key is partition or primary key). For RDDs this is could be done with .joinWithCassandraTable function (doc):

    import com.datastax.spark.connector._
    
    val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
    val joined = toJoin.joinWithCassandraTable("test", "jtest1")
      .on(SomeColumns("pk"))
    
    scala> joined.toDebugString
    res21: String =
    (8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
     |  ParallelCollectionRDD[147] at parallelize at <console>:33 []
    

    For Dataframes it's so called direct join that is available since SCC 2.5 (see announcement) - you need to pass some configs to enable it, see docs:

    import spark.implicits._
    import org.apache.spark.sql.cassandra._
    
    val cassdata = spark.read.cassandraFormat("jtest1", "test").load
    
    val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
    val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
    
    scala> joined.explain
    == Physical Plan ==
    Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
    +- *(1) Project [cast(id#0L as int) AS id#2]
       +- *(1) Range (1, 5, step=1, splits=8)
    

    I have quite a long & detailed blog post about joins with Cassandra - check it for more details.