I have a list of ids in string format, this list can be roughly 20,000 ids long:
var timelineIds = source.map(a => a.timelineid);
timelineIds = timelineIds.distinct.cache; // disticnt list we need this for later
var timelineIdsString = timelineIds.map(a => a.asInstanceOf[String]).collect.toList;
When I use this list against one of my cassandra tables it works just fine, no matter the size of timelineIdsString:
var timelineHistorySource = sc.cassandraTable[Timeline]("acd", "timeline_history_bytimelineid")
.select("ownerid", "userid", "timelineid", "timelinetype", "starttime", "endtime", "attributes", "states")
if (constrain)
timelineHistorySource = timelineHistorySource.where("timelineid IN ?", timelineIdsString)
When I do it against another of my tables, it never completes when I have over 1000 ids in the List:
var dispositionSource = sc.cassandraTable[DispositionSource]("acd","dispositions_bytimelineid")
.select("ownerid","dispositionid","month","timelineid","createddate","createduserid")
if(constrain)
dispositionSource = dispositionSource.where("timelineid IN ?", timelineIdsString);
Both cassandra tables have the key as the timelineid so I know that its correct. This code works fine as long as timelineids is a small list.
Is there a better way to filter from cassandra RDD? Is it the size of the IN clause causing it to choke?
Instead of performing join on Spark level it's better to perform join using Cassandra itself - in this case you'll read from Cassandra only the necessary data (given that join key is partition or primary key). For RDDs this is could be done with .joinWithCassandraTable
function (doc):
import com.datastax.spark.connector._
val toJoin = sc.parallelize(1 until 5).map(x => Tuple1(x.toInt))
val joined = toJoin.joinWithCassandraTable("test", "jtest1")
.on(SomeColumns("pk"))
scala> joined.toDebugString
res21: String =
(8) CassandraJoinRDD[150] at RDD at CassandraRDD.scala:18 []
| ParallelCollectionRDD[147] at parallelize at <console>:33 []
For Dataframes it's so called direct join that is available since SCC 2.5 (see announcement) - you need to pass some configs to enable it, see docs:
import spark.implicits._
import org.apache.spark.sql.cassandra._
val cassdata = spark.read.cassandraFormat("jtest1", "test").load
val toJoin = spark.range(1, 5).select($"id".cast("int").as("id"))
val joined = toJoin.join(cassdata, cassdata("pk") === toJoin("id"))
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#2] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#0L as int) AS id#2]
+- *(1) Range (1, 5, step=1, splits=8)
I have quite a long & detailed blog post about joins with Cassandra - check it for more details.