In my code I read a data from an existing Cassandra table into a Spark DataFrame and transform it to build a set of new tables with the reverse mappings of the original data (the end goal is to serve the search queries that come via the REST API).
Recently I have added some tracing and discovered a thing I cannot explain. Below is a piece of Scala code to illustrate the matter.
// df: org.apache.spark.sql.DataFrame
//
// control point 1: before writing the data to Cassandra
val inputCount = df.count
// write data to new C* table
df.createCassandraTable(keyspaceName, tableName, <otherArgs>)
df.write.mode("append").cassandraFormat(tableName, keyspaceName).save()
// read data back
val readbackDf = sqlContext.read.cassandraFormat(tableName, keyspaceName).load().cache
// control point 2: data written to C* table
val outputCount = readbackDf.count
// Produces different numbers
println(s"Input count = ${inputCount}; output count = ${outputCount}")
If I calculate .count
of the dataframe before I write the data to the newly created table, it differs from the .count
of the dataframe I get by reading back from this new table.
Therefore, I've got 2 questions:
inputCount
and outputCount
?outputCount
in the code above, what would be the correct approach?The problem was indeed related to Cassandra consistency settings. Many thanks Anurag who pointed it out.
It turned out that in my testing environment I used defaults for both read- and write- strategies, which is LOCAL_ONE
. So that easily explains the divergence.
I ended up setting them both to LOCAL_QUORUM
:
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_QUORUM
Having said that, I'd like to point out that I also tried setting only reads to LOCAL_QUORUM
spark.cassandra.input.consistency.level=LOCAL_QUORUM
spark.cassandra.output.consistency.level=LOCAL_ONE
which almost nullified the divergence.
Yet, I was still able to observe the small divergence with these settings sometimes (one in 3-4 runs) with some of my ETL jobs.
While I don't see significant performance degradation with both reads/writes consistency set to LOCAL_QUORUM
, so that the issue doesn't block me anymore, I'm still curious why setting only reads to LOCAL_QUORUM
doesn't fully cure the problem.
Could anyone suggest "for-dummies" explanation of this?