I have some Spark experience but just starting out with Cassandra. I am trying to do a very simple read and getting really bad performance -- can't tell why. Here is the code I am using:
sc.cassandraTable("nt_live_october","nt")
.where("group_id='254358'")
.where("epoch >=1443916800 and epoch<=1444348800")
.first
all 3 params are part of the key on the table:
PRIMARY KEY (group_id, epoch, group_name, auto_generated_uuid_field) ) WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)
And the output I see from my driver is like this:
15/10/07 15:05:02 INFO CassandraConnector: Connected to Cassandra cluster: shakassandra 15/10/07 15:07:02 ERROR Session: Error creating pool to attila./198.xxx:9042 com.datastax.driver.core.ConnectionException: [attila./198.xxx:9042] Unexpected error during transport initialization (com.datastax.driver.core.OperationTimedOutException: [attila /198.xxx:9042] Operation timed out)
15/10/07 15:07:02 INFO SparkContext: Starting job: take at CassandraRDD.scala:121
15/10/07 15:07:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on osd09:39903 (size: 4.8 KB, free: 265.4 MB)
15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)
15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)
15/10/07 15:08:23 INFO DAGScheduler: ResultStage 6 (take at CassandraRDD.scala:121) finished in 80.958 s 15/10/07 15:08:23 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
15/10/07 15:08:23 INFO DAGScheduler: Job 5 finished: take at CassandraRDD.scala:121, took 81.043413 s
I expect this query to be really fast yet it's taking over a minute. A few things jump out at me
Any tips on how to debug this, where to look for potential problems much appreciated. Using Spark 1.4.1 with connector 1.4.0-M3, cassandra ReleaseVersion: 2.1.9, all defaults on tuneable connector params
I think the problem lays into distribution of data between partitions. Your table has one cluster (partitioning) key - groupId, epoch is a clustering column only. Data distributes on cluster nodes only by groupId, so you have a huge partition with groupId='254358' on one node on the cluster. When you run your query Cassandra reaches very fast partition with groupId='254358' and then filter all rows to find records with epoch between 1443916800 and 1444348800. If there are a lot of rows the query will be really slow. Actually this query is not distributed it will always run on one node.
Better practice extract date or even hour and add it as partitioning key, in your case something like
PRIMARY KEY ((group_id, date), epoch, group_name, auto_generated_uuid_field)
WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)
To verify my hypothesis you can run your current query in cqlsh with turning on tracing read here how to do it. So the problem has nothing in connect with Spark.
About error and time to get it, everything is fine because you receive error after timeout happened.
Also I remember recommendations of spark-cassandra-connector to place Spark slaves joint to Cassandra nodes exactly to distribute queries by partitioning key.