I have written a custom LoadBalancerPolicy for spark-cassandra-connector
and now I want to ensure that it really works!
I have a Cassandra cluster with 3 nodes and a keyspace with a replication factor of 2, so when we want to retrieve a record, there will be only two nodes on cassandra which hold the data.
The thing is that I want to ensure the spark-cassandra-connector
(with my load-balancer-policy) is still token-aware and will choose the right node as coordinator for each "SELECT" statement.
Now, I'm thinking if we can write a trigger on the SELECT statement for each node, in case of the node does not hold the data, the trigger will create a log and I realize the load-balancer-policy does not work properly. How can we write a trigger On SELECT in Cassandra? Is there any better way to accomplish that?
I already checked the documentation for creating the triggers and those are too limited:
According to what Alex said, we can do it as below:
After creating SparkSession, we should make a connector:
import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
Now we can define a preparedStatement and do the rest:
connector.withSessionDo(session => {
val selectQuery = "select * from test where id=?"
val prepareStatement = session.prepare(selectQuery)
val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
// We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
val boundStatement = prepareStatement.bind(s"$id")
val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
// We can get tha all of nodes that contains the row
val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
val resultSet = session.execute(boundStatement)
// We can get the node which gave us the row
val host = resultSet.getExecutionInfo.getQueriedHost
// Final step is to check whether the replicas contains the host or not!!!
if (replicas.contains(host)) println("It works!")
})
The important thing is that we have to explicitly bind the all of parameters that partition key is based on them (i.e. we cannot set them har-codded in the SELECT statement), otherwise the routingKey will be null.