Search code examples
apache-sparkcassandraspark-cassandra-connector

NoHostAvailableException (no host was tried) using Spark Cassandra Connector


I am having a problem with the DataStax Spark Connector for Cassandra. My application contains a Spark operation that performs a number of single-record queries on the Cassandra database; a number of these queries will succeed, but at some point one of the queries will fail with a NoHostAvailableException with the message All host(s) tried for query failed (no host was tried).

Stack trace

2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy15.execute(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy16.execute(Unknown Source)
    at [line that contains the session.execute() call]
    [...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
    at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    ... 32 more

In an attempt to analyse this problem I have succeeded in reproducing it in a simple environment:

  • A single machine running Cassandra, the Spark master, and the spark worker
  • A simple table containing only 100 records (10 partitions with 10 records each)

Below is the minimal code with which I can reproduce the issue.

Code

val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]

val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
    connector.withSessionDo { session =>
        val clusteringKeyValues = Seq(...)

        val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")

        iterator.flatMap { pkColumn2Value =>
            val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
                preparedStatement.bind(
                    pkColumn1Value.asInstanceOf[AnyRef]
                    , pkColumn2Value.asInstanceOf[AnyRef]
                    , clusteringKeyValue.asInstanceOf[AnyRef]
                )
            )

            boundStatements.map { boundStatement =>
                val record = try {
                    session.execute(boundStatement).one()
                } catch {
                    case noHostAvailableException: NoHostAvailableException =>
                        log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
                        throw noHostAvailableException
                    case exception =>
                        throw exception
                }

                log.error(s"Retrieved record $record")
                // Sleep to simulate an operation being performed on the value.
                Thread.sleep(100)

                record
            }
        }
    }
}

log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")

Some interesting things I have noticed

  • I am using Dataset#mapPartitions() to be able to prepare the select statement only once per partition. The problem disappears when I swallow my pride and use Dataset#map() or Dataset#flatMap() instead, but I would like to use Dataset#mapPartitions() for the (ostensible) performance benefits of preparing the query only once per Dataset partition.
  • The NoHostAvailableException seems to occur a fixed amount of time after the first query is executed. Some investigation confirmed that this amount of time is equal to the value of connector property spark.cassandra.connection.keep_alive_ms. Setting this property to a ridiculously high value would ostensibly resolve the problem, but this seems like a dirty work-around instead of a sensible solution.

In this GitHub issue for the connector, commenter pkolaczk mentions a potential issue that could cause the connector to succeed in its initial connection to Cassandra and to fail when trying to later establish additional connections. This sounds promising because it matches with the above points (which suggest that the problem will only occur once the original connections have been closed, which would never happen if the connection is re-established for every element in the Dataset individually); however, I have been unable to find any indication that I have misconfigured an IP address or any other plausible cause for this phenomenon (or even confirmation that this phenomenon is in fact causing the issue in the first place).

Some things I have checked and/or tried

  • Multiple online sources suggest that NoHostAvailableExceptions are always preceded by other errors. I have checked my logs on multiple occasions, but cannot find any other error messages or stack traces.
  • An answer to another StackOverflow question suggested calling NoHostAvailableException#getErrors to obtain a more detailed explanation of the issue, but this method always returns an empty map for me.
  • The problem remains when I use RDDs instead of Datasets (including the fact that it occurs only when using mapPartitions and not when using map).
  • The connector property spark.cassandra.connection.local_dc was originally unset. Setting this property to the appropriate data center name had no noticeable effect on the issue.
  • I tried setting the connector properties spark.cassandra.connection.timeout_ms and spark.cassandra.read.timeout_ms to ridiculously high values; this had no noticeable effect on the issue.

Some version numbers

  • Spark: Reproduced the issue with both 2.1.1 and 2.3.0
  • Cassandra: 3.11
  • Connector: Reproduced the issue with both 2.0.3 and 2.3.0
  • Scala: 2.11

Any indication of what is causing these errors or an idea of how to fix the issue would be greatly appreciated.


Solution

  • I cross-posted this question to the connector's Google User Group (https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ), where it was confirmed by one of its contributors that there is no reason not to have a high value for spark.cassandra.connection.keep_alive_ms. I have bumped up this value to a point where I could be reasonably certain that no operations would pass it, and have had no problems since.