I am trying to optimize my spark job by avoiding shuffling as much as possible.
I am using cassandraTable to create the RDD.
The column family's column names are dynamic, thus it is defined as follows:
CREATE TABLE "Profile" (
key text,
column1 text,
value blob,
PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
bloom_filter_fp_chance=0.010000 AND
caching='ALL' AND
...
This definition results in CassandraRow RDD elements in the following format:
CassandraRow <key, column1, value>
So if I have RK='profile1', with columns name='George' and age='34', the resulting RDD will be:
CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>
Then I need to group elements that share the same key together to get a PairRdd:
PairRdd<String, Iterable<CassandraRow>>
Important to say, that all the elements I need to group are in the same Cassandra node (share the same row key), so I expect the connector to keep the locality of the data.
The problem is that using groupBy or groupByKey causes shuffling. I rather group them locally, because all the data is on the same node:
JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
.cassandraTable(ks, "Profile")
.groupBy(new Function<ColumnFamilyModel, String>() {
@Override
public String call(ColumnFamilyModel arg0) throws Exception {
return arg0.getKey();
}
})
My questions are:
Thanks,
Shai
I think you are looking for spanByKey
, a cassandra-connector specific operation that takes advantage of the ordering provided by cassandra to allow grouping of elements without incurring in a shuffle stage.
In your case, it should look like:
sc.cassandraTable("keyspace", "Profile")
.keyBy(row => (row.getString("key")))
.spanByKey
Read more in the docs:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key