Search code examples
dataframeapache-sparkapache-spark-sqlcassandraspark-cassandra-connector

Spark Dataframe.cache() behavior for changing source


My use case:

  1. Create a dataframe from a cassandra table.
  2. Create a output dataframe by filtering on a column and modify that column's value.
  3. Write the output dataframe to cassandra with a TTL set, so all the modified records are deleted after a short period (2s)
  4. Return the output dataframe to a caller that writes it to filesystem after some time. I can only return a dataframe to the caller and I don't have further control. Also, i can't increase the TTL.

By the time, step 4 is executed, the output dataframe is empty. This is because, spark re-evaluates the dataframe on the action, and due to lineage the cassandra query is done again, which now yields no records.
To avoid this, I added a step after step 2:

2a) outputDataframe.cache()

This ensures that during step 5, cassandra is not queried, and I get desired output records in my file as well. I have below queries on this approach:

  1. Is it possible that, in cases where spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the cassandra query? If yes, what is the way to avoid that in all cases?
  2. I have seen another way of doing the caching: df.rdd.cache(). Is this any different than calling cache() on the dataframe?

For reference, my current code looks as follows:

//1
val dfOrig = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "pushdown" -> "true"))
      .load()
//2
val df = dfOrig.filter("del_flag = 'N'").withColumn("del_flag", lit("Y"))
//3
df.write.format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> "myks", "table" -> "mytable", "spark.cassandra.output.ttl" -> "120"))
      .mode("append")
      .save()
//4
// <After quite some processing, mostly after the TTL, and in the calling code>
df.write.format("csv").save("some.csv") 

Solution

  • Is it possible that, in cases where Spark doesn't find the cached data (cache lookup fails), it will go up the lineage and run the Cassandra query?

    Yes it is possible. Cached data can be removed by the cache cleaner (primarily in MEMORY_ONLY mode), can be lost when the corresponding node is decommissioned (crashed, preempted, released by dynamic allocation). Additionally other options, like speculative execution, can affect cache behavior.

    Finally data might not be fully cached in first place.

    If yes, what is the way to avoid that in all cases?

    Don't use cache / persist if you require strong consistency guarantees - it wasn't designed with use cases like this one in mind. Instead export data to a persistent, reliable storage (like HDFS) and read it from there.

    You could also use checkpoint with HDFS checkpointDir.

    You might be tempted to use more reliable caching mode like MEMORY_AND_DISK_2 - this might reduce the probability of recomputing the data, at the cost of

    df.rdd.cache(). Is this any different than calling cache() on the dataframe?

    It is different (the primary difference is the serialization strategy), but not when it comes to the properties which are of interest in the scope of this question.

    Important:

    Please note that caching behavior might not be the biggest issue in your code. Reading from and appending to a single table can result in all kinds of undesired or undefined behaviors in complex pipelines, unless additional steps are taken to ensure that reader doesn't pick newly written records.