Search code examples
cassandradatastaxdatastax-enterprise

Cassandra : node become unavailable, while ingesting with Spark


After few successfully ingested data into Cassandra with Spark,

an error is now returned every time I try to ingest data with Spark (after few minutes or instantly) :

Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses

I checked with simple CQLSH (not Spark), and similar error is indeed returned too (2 nodes of 4) :

Connection error: ('Unable to connect to any servers', {'1.2.3.4': error(111, "Tried connecting to [('1.2.3.4', 9042)]. Last error: Connection refused")})

So basically, when I do ingestion into Cassandra with Spark, some nodes go down at some point. And I have to reboot the node, in order to access it again through cqlsh (and spark).

What is strange, is that it is still written "UP" for the given node when I run nodetool status, while cqlsh tells connection refused for that node.

I try to investigate logs, but I have a big problem : nothing in the logs, no single exception triggered server-side.

What to do in my case ? Why a node go down or become unresponsive in that case ? How to prevent it ?

Thanks


!!! edit !!!


Some of the details asked for, bellow :

Cassandra infrastructure :

  • network : 10 gbps
  • two datacenters : datacenter1 and datacenter2
  • 4 nodes in each datacenter
  • 2 replicas per datacenter :
    CREATE KEYSPACE my_keyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '2', 'datacenter2': '2'} AND durable_writes = true;
  • consistency used for input and output : LOCAL_QUORUM
  • total physical memory per node : 128GB.
  • memory repartition per node : 64GB dedicated for each Cassandra instance, and 64GB dedicated for each Spark worker (colocated on each Cassandra node)
  • storage : 4 TB NVME for each node

Spark application config :

  • total executors cores : 24 cores (4 instances * 6 cores each)
  • total executors ram : 48 GB (4 instances * 8 GB each)
  • cassandra config on spark :
spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 80
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

Solution

  • (2 nodes of 4)

    Just curious, but what is the replication factor (RF) of the keyspace, and what consistency level is being used for the write operation?

    I'll echo Alex, and say that usually this happens because Spark is writing faster than Cassandra can process. That leaves you with two options:

    • Increase the size of the cluster to handle the write load.
    • Throttle-back the write throughput of the Spark job.

    One thing worth calling out:

    2 replicas per datacenter

    consistency used for input and output : LOCAL_QUORUM

    So you'll probably get more throughput by dropping the write consistency to LOCAL_ONE.

    Remember, quorum == RF / 2 + 1, which means LOCAL_QUORUM of 2 is 2.

    So I do recommend dropping to LOCAL_ONE, because right now Spark is effectively operating @ ALL consistency.

    Which JMX indicators I need to care about ?

    Can't remember the exact name of it, but if you can find the metric for disk IOPs or throughput, I wonder if it's hitting a threshold and plateauing.