Search code examples
cassandradatastax-enterprisecassandra-3.0

Performance of token range based queries on partition keys?


I am selecting all records from cassandra nodes based on token range of my partition key.

Below is the code:

public static synchronized List<Object[]> getTokenRanges(
      final Session session) {

    if (cluster == null) {
      cluster = session.getCluster();
    }

    Metadata metadata = cluster.getMetadata();

    return unwrapTokenRanges(metadata.getTokenRanges());
  }

  private static List<Object[]> unwrapTokenRanges(Set<TokenRange> wrappedRanges) {

    final int tokensSize = 2;
    List<Object[]> tokenRanges = new ArrayList<>();
    for (TokenRange tokenRange : wrappedRanges) {
      List<TokenRange> unwrappedTokenRangeList = tokenRange.unwrap();
      for (TokenRange unwrappedTokenRange : unwrappedTokenRangeList) {
        Object[] objects = new Object[tokensSize];
        objects[0] = unwrappedTokenRange.getStart().getValue();
        objects[1] = unwrappedTokenRange.getEnd().getValue();
        tokenRanges.add(objects);
      }
    }
    return tokenRanges;
  }

getTokenRanges gives me all token range of vnodes across all nodes.

Then I am using these token range to query cassandra. object[0] holds start token of vnode and object[1] end token.

Which generates below query:

SELECT * FROM my_key_space.tablename WHERE token(id)><start token number> AND token(id)<= <end token number>;

In above id column is partition key.

In Cassandra it is not recommended to perform range queries, So, will this query be performant?

From what I know, this query will call, only the individual partition/vnode and will not call multiple partitions and hence there should not be any performance issue? Is this correct?

Cassandra version: 3.x


Solution

  • Queries on the token ranges are performant, and Spark uses them for effective data fetching. But you need to need to keep in mind following - getTokenRanges will give you all existing token ranges, but there are some edge cases - the last range will be from some positive number to negative number that represents first range, and as such, your query won't do anything. Basically you miss data between MIN_TOKEN and first token, and between last token and MAX_TOKEN. Spark Connector generates different CQL statements based on the token. Plus you need to route query to correct node - this could be done via setRoutingToken.

    Similar approach could be used in Java code (full code):

        Metadata metadata = cluster.getMetadata();
        Metadata metadata = cluster.getMetadata();
        List<TokenRange> ranges = new ArrayList(metadata.getTokenRanges());
        Collections.sort(ranges);
        System.out.println("Processing " + (ranges.size()+1) + " token ranges...");
    
        Token minToken = ranges.get(0).getStart();
        String baseQuery = "SELECT id, col1 FROM test.range_scan WHERE ";
        Map<String, Token> queries = new HashMap<>();
        // generate queries for every range
        for (int i = 0; i < ranges.size(); i++) {
            TokenRange range = ranges.get(i);
            Token rangeStart = range.getStart();
            Token rangeEnd = range.getEnd();
            if (i == 0) {
                queries.put(baseQuery + "token(id) <= " + minToken, minToken);
                queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
            } else if (rangeEnd.equals(minToken)) {
                queries.put(baseQuery + "token(id) > " + rangeStart, rangeEnd);
            } else {
                queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
            }
        }
    
        // Note: It could be speedup by using async queries, but for illustration it's ok
        long rowCount = 0;
        for (Map.Entry<String, Token> entry: queries.entrySet()) {
            SimpleStatement statement = new SimpleStatement(entry.getKey());
            statement.setRoutingToken(entry.getValue());
            ResultSet rs = session.execute(statement);
            // .... process data
       }