My use case:
By the time, step 4 is executed, the output dataframe is empty. This is because, spark re-evaluates the dataframe on the action, and due to lineage the cassandra query is done again, which now yields no records.
To avoid this, I added a step after step 2:
2a) outputDataframe.cache()
This ensures that during step 5, cassandra is not queried, and I get desired output records in my file as well. I have below queries on this approach:
df.rdd.cache()
. Is this any different than calling cache()
on the dataframe?For reference, my current code looks as follows:
//1
val dfOrig = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
.load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
.mode("append")
.save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv")
Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?
Yes it is possible. Cached data can be removed by the cache cleaner (primarily in MEMORY_ONLY
mode), can be lost when the corresponding node is decommissioned (crashed, preempted, released by dynamic allocation). Additionally other options, like speculative execution, can affect cache behavior.
Finally data might not be fully cached in first place.
If yes, what is the way to avoid that in all cases?
Don't use cache
/ persist
if you require strong consistency guarantees - it wasn't designed with use cases like this one in mind. Instead export data to a persistent, reliable storage (like HDFS) and read it from there.
You could also use checkpoint
with HDFS checkpointDir
.
You might be tempted to use more reliable caching mode like MEMORY_AND_DISK_2
- this might reduce the probability of recomputing the data, at the cost of
df.rdd.cache(). Is this any different than calling cache() on the dataframe?
It is different (the primary difference is the serialization strategy), but not when it comes to the properties which are of interest in the scope of this question.
Important:
Please note that caching behavior might not be the biggest issue in your code. Reading from and appending to a single table can result in all kinds of undesired or undefined behaviors in complex pipelines, unless additional steps are taken to ensure that reader doesn't pick newly written records.