I am selecting all records from cassandra nodes based on token range of my partition key.
Below is the code:
public static synchronized List<Object[]> getTokenRanges(
final Session session) {
if (cluster == null) {
cluster = session.getCluster();
}
Metadata metadata = cluster.getMetadata();
return unwrapTokenRanges(metadata.getTokenRanges());
}
private static List<Object[]> unwrapTokenRanges(Set<TokenRange> wrappedRanges) {
final int tokensSize = 2;
List<Object[]> tokenRanges = new ArrayList<>();
for (TokenRange tokenRange : wrappedRanges) {
List<TokenRange> unwrappedTokenRangeList = tokenRange.unwrap();
for (TokenRange unwrappedTokenRange : unwrappedTokenRangeList) {
Object[] objects = new Object[tokensSize];
objects[0] = unwrappedTokenRange.getStart().getValue();
objects[1] = unwrappedTokenRange.getEnd().getValue();
tokenRanges.add(objects);
}
}
return tokenRanges;
}
getTokenRanges
gives me all token range of vnodes across all nodes.
Then I am using these token range to query cassandra. object[0]
holds start token of vnode and object[1]
end token.
Which generates below query:
SELECT * FROM my_key_space.tablename WHERE token(id)><start token number> AND token(id)<= <end token number>;
In above id
column is partition key.
In Cassandra it is not recommended to perform range queries, So, will this query be performant?
From what I know, this query will call, only the individual partition/vnode and will not call multiple partitions and hence there should not be any performance issue? Is this correct?
Cassandra version: 3.x
Queries on the token ranges are performant, and Spark uses them for effective data fetching. But you need to need to keep in mind following - getTokenRanges
will give you all existing token ranges, but there are some edge cases - the last range will be from some positive number to negative number that represents first range, and as such, your query won't do anything. Basically you miss data between MIN_TOKEN
and first token, and between last token and MAX_TOKEN
. Spark Connector generates different CQL statements based on the token. Plus you need to route query to correct node - this could be done via setRoutingToken
.
Similar approach could be used in Java code (full code):
Metadata metadata = cluster.getMetadata();
Metadata metadata = cluster.getMetadata();
List<TokenRange> ranges = new ArrayList(metadata.getTokenRanges());
Collections.sort(ranges);
System.out.println("Processing " + (ranges.size()+1) + " token ranges...");
Token minToken = ranges.get(0).getStart();
String baseQuery = "SELECT id, col1 FROM test.range_scan WHERE ";
Map<String, Token> queries = new HashMap<>();
// generate queries for every range
for (int i = 0; i < ranges.size(); i++) {
TokenRange range = ranges.get(i);
Token rangeStart = range.getStart();
Token rangeEnd = range.getEnd();
if (i == 0) {
queries.put(baseQuery + "token(id) <= " + minToken, minToken);
queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
} else if (rangeEnd.equals(minToken)) {
queries.put(baseQuery + "token(id) > " + rangeStart, rangeEnd);
} else {
queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
}
}
// Note: It could be speedup by using async queries, but for illustration it's ok
long rowCount = 0;
for (Map.Entry<String, Token> entry: queries.entrySet()) {
SimpleStatement statement = new SimpleStatement(entry.getKey());
statement.setRoutingToken(entry.getValue());
ResultSet rs = session.execute(statement);
// .... process data
}