I have two clusters - 1. Cloudera Hadoop- Spark jobs run here 2. Cloud - Cassandra cluster, multiple DC
While writing a dataframe from my spark job to a cassandra cluster, I am doing a repartition (repartionCount=10) in spark before writing. See below:
import org.apache.spark.sql.cassandra._
records.repartition(repartitionCount).write.cassandraFormat(table, keySpace)
.mode(SaveMode.Append)
.options(options)
.option(CassandraConnectorConf.LocalDCParam.name, cassandraDC.name)
.option(CassandraConnectorConf.ConnectionHostParam.name, cassandraDC.hosts)
.save()
In my multi tenant spark cluster, for a spark batch load with 20M records, and below configs, I see lot of task failures, resource preemption and on the fly failures.
spark.cassandra.output.batch.grouping.buffer.size=1000
spark.cassandra.output.batch.grouping.key=partition
spark.cassandra.output.concurrent.writes=20
spark.cassandra.connection.compression=LZ4
How should I tune this? Is the repartition to blame?
PS: My understanding in the beginning was: For a load with 20M rows, the "repartition" should distribute load evenly over executors (partition with 2M rows each), and batching will be done on these partition level (on 2M rows). But now, I am doubting is this causing unnecessary shuffle, if the spark-cassandra-connector is doing batching on whole dataframe level (whole 20M rows).
UPDATE: Removing the "repartition" brought down the performance a lot on my cloudera spark cluster (default partitions set at spark level is - spark.sql.shuffle.partitions: 200
), so i dug a bit deeper and found my initial understanding was correct. Please note my spark and cassandra clusters are different. Datastax spark-cassandra-connector opens one connection per partition with a cassandra coordinator node, so I have decided to let it be the same. As Alex suggested, I have reduced the concurrent writes, I believe that should help.
You don't need to do repartition in Spark - just write data from Spark to Cassandra, don't try to change Spark Cassandra Connector defaults - they are work fine in the most situations. You need to look what kind of stage failures happening - most probably you're simply overloading Cassandra because of spark.cassandra.output.concurrent.writes=20
(use default value (5
)) - sometimes having less writers help to write data faster as you don't overload Cassandra, and jobs aren't restarted.
P.S. partition
in the spark.cassandra.output.batch.grouping.key
- it's not a Spark partition, it's Cassandra partition that depends on the value of partition key column.