Search code examples
apache-sparkapache-spark-sqlcassandraspark-cassandra-connector

Why is there different number of elements in Spark DataFrames before and after writing to a new Cassandra table?


In my code I read a data from an existing Cassandra table into a Spark DataFrame and transform it to build a set of new tables with the reverse mappings of the original data (the end goal is to serve the search queries that come via the REST API).

Recently I have added some tracing and discovered a thing I cannot explain. Below is a piece of Scala code to illustrate the matter.

// df: org.apache.spark.sql.DataFrame
//
// control point 1: before writing the data to Cassandra
val inputCount = df.count
// write data to new C* table
df.createCassandraTable(keyspaceName, tableName, <otherArgs>)
df.write.mode("append").cassandraFormat(tableName, keyspaceName).save()

// read data back
val readbackDf = sqlContext.read.cassandraFormat(tableName, keyspaceName).load().cache
// control point 2: data written to C* table
val outputCount = readbackDf.count

// Produces different numbers
println(s"Input count = ${inputCount}; output count = ${outputCount}")

If I calculate .count of the dataframe before I write the data to the newly created table, it differs from the .count of the dataframe I get by reading back from this new table.

Therefore, I've got 2 questions:

  1. Why do I observe different values for inputCount and outputCount?
  2. If I use the wrong way to calculate outputCount in the code above, what would be the correct approach?

Solution

  • The problem was indeed related to Cassandra consistency settings. Many thanks Anurag who pointed it out.

    It turned out that in my testing environment I used defaults for both read- and write- strategies, which is LOCAL_ONE. So that easily explains the divergence.

    I ended up setting them both to LOCAL_QUORUM:

    spark.cassandra.input.consistency.level=LOCAL_QUORUM
    spark.cassandra.output.consistency.level=LOCAL_QUORUM
    

    Having said that, I'd like to point out that I also tried setting only reads to LOCAL_QUORUM

    spark.cassandra.input.consistency.level=LOCAL_QUORUM
    spark.cassandra.output.consistency.level=LOCAL_ONE
    

    which almost nullified the divergence.

    Yet, I was still able to observe the small divergence with these settings sometimes (one in 3-4 runs) with some of my ETL jobs.

    While I don't see significant performance degradation with both reads/writes consistency set to LOCAL_QUORUM, so that the issue doesn't block me anymore, I'm still curious why setting only reads to LOCAL_QUORUM doesn't fully cure the problem.

    Could anyone suggest "for-dummies" explanation of this?