Search code examples
cassandradatastaxdatastax-enterprisedatastax-java-driverspark-cassandra-connector

Cassandra : timeout during SIMPLE write query at consistency LOCAL_QUORUM


I'm trying to ingest data (one partition = 1MB BLOB) from Spark to Cassandra with theses conf parameters :

spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 1
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

I start with a total 16 cores Spark Job, and down to juste 1 core Spark Job.

Anyway, every time, after some times, the response is as follow, and the driver go to state failed :

21/09/19 19:03:50 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatementWrapper@532adef2
com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required but only 0 acknowledged the write)

It may be related to some nodes overloaded.. but how to debug ? What conf to adjust ?

Thanks


Solution

  • Problem solved!

    The problem was MY DATA, and NOT Cassandra.

    Indeed, the size of few partitions (2000 of 60 000 000) were about 50 MB, instead of 1MB that I expected.

    I just filtered to exclude large partition while ingesting in Spark :

    import org.apache.spark.sql.functions.{col, expr, length}
    ...
    spark.read.parquet("...")
    // EXCLUDE LARGE PARTITIONS
    .withColumn("bytes_count",length(col("blob")))
    .filter("bytes_count< " + argSkipPartitionLargerThan)
    // PROJECT
    .select("data_key","blob")
    // COMMIT
    .writeTo(DS + "." + argTargetKS + "."+argTargetTable).append()
    

    Ingestion is now OK with Spark in just 10 minutes (500 GB data)