Search code examples
cassandradatabase-triggerspark-cassandra-connector

Is there any way to find out which node has been used by SELECT statement in Cassandra?


I have written a custom LoadBalancerPolicy for spark-cassandra-connector and now I want to ensure that it really works!

I have a Cassandra cluster with 3 nodes and a keyspace with a replication factor of 2, so when we want to retrieve a record, there will be only two nodes on cassandra which hold the data.

The thing is that I want to ensure the spark-cassandra-connector (with my load-balancer-policy) is still token-aware and will choose the right node as coordinator for each "SELECT" statement.

Now, I'm thinking if we can write a trigger on the SELECT statement for each node, in case of the node does not hold the data, the trigger will create a log and I realize the load-balancer-policy does not work properly. How can we write a trigger On SELECT in Cassandra? Is there any better way to accomplish that?

I already checked the documentation for creating the triggers and those are too limited:

Official documentation

Documentation at DataStax

Example implementation in official repo


Solution

  • According to what Alex said, we can do it as below:

    After creating SparkSession, we should make a connector:

    import com.datastax.spark.connector.cql.CassandraConnector
    val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
    

    Now we can define a preparedStatement and do the rest:

    connector.withSessionDo(session => {
    
        val selectQuery = "select * from test where id=?"
        val prepareStatement = session.prepare(selectQuery)
        val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
        // We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
        val boundStatement = prepareStatement.bind(s"$id")
        val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
        // We can get tha all of nodes that contains the row
        val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
        val resultSet = session.execute(boundStatement)
    
        // We can get the node which gave us the row
        val host = resultSet.getExecutionInfo.getQueriedHost
    
        // Final step is to check whether the replicas contains the host or not!!!
        if (replicas.contains(host)) println("It works!")
      })
    

    The important thing is that we have to explicitly bind the all of parameters that partition key is based on them (i.e. we cannot set them har-codded in the SELECT statement), otherwise the routingKey will be null.