Search code examples
apache-sparkcassandraspark-cassandra-connector

cassandra spark connector read performance


I have some Spark experience but just starting out with Cassandra. I am trying to do a very simple read and getting really bad performance -- can't tell why. Here is the code I am using:

sc.cassandraTable("nt_live_october","nt")
  .where("group_id='254358'")
  .where("epoch >=1443916800 and epoch<=1444348800")
  .first

enter image description here

all 3 params are part of the key on the table:

PRIMARY KEY (group_id, epoch, group_name, auto_generated_uuid_field) ) WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)

And the output I see from my driver is like this:

15/10/07 15:05:02 INFO CassandraConnector: Connected to Cassandra cluster: shakassandra 15/10/07 15:07:02 ERROR Session: Error creating pool to attila./198.xxx:9042 com.datastax.driver.core.ConnectionException: [attila./198.xxx:9042] Unexpected error during transport initialization (com.datastax.driver.core.OperationTimedOutException: [attila /198.xxx:9042] Operation timed out)

15/10/07 15:07:02 INFO SparkContext: Starting job: take at CassandraRDD.scala:121

15/10/07 15:07:03 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on osd09:39903 (size: 4.8 KB, free: 265.4 MB)

15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)

15/10/07 15:08:23 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 8) in 80153 ms on osd09 (1/1)

15/10/07 15:08:23 INFO DAGScheduler: ResultStage 6 (take at CassandraRDD.scala:121) finished in 80.958 s 15/10/07 15:08:23 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool

15/10/07 15:08:23 INFO DAGScheduler: Job 5 finished: take at CassandraRDD.scala:121, took 81.043413 s

I expect this query to be really fast yet it's taking over a minute. A few things jump out at me

  1. It takes almost two minutes to get the session error -- I pass the IPs of 3 nodes to Spark Cassandra connector -- is there a way to tell it to skip failed connections faster?
  2. The task gets sent to a Spark worker which is not a Cassandra node -- this seems pretty strange to me -- is there a way to get information as to why the scheduler chose to send the task to a remote node?
  3. Even if the task was sent to a remote node, the Input Size(Max) on that worker shows up as 334.0 B / 1 but the executor time is 1.3 min (see picture). This seems really slow -- I would expect time to be spent on deserialization, not compute... enter image description here

Any tips on how to debug this, where to look for potential problems much appreciated. Using Spark 1.4.1 with connector 1.4.0-M3, cassandra ReleaseVersion: 2.1.9, all defaults on tuneable connector params


Solution

  • I think the problem lays into distribution of data between partitions. Your table has one cluster (partitioning) key - groupId, epoch is a clustering column only. Data distributes on cluster nodes only by groupId, so you have a huge partition with groupId='254358' on one node on the cluster. When you run your query Cassandra reaches very fast partition with groupId='254358' and then filter all rows to find records with epoch between 1443916800 and 1444348800. If there are a lot of rows the query will be really slow. Actually this query is not distributed it will always run on one node.

    Better practice extract date or even hour and add it as partitioning key, in your case something like

    PRIMARY KEY ((group_id, date), epoch, group_name, auto_generated_uuid_field) 
    WITH CLUSTERING ORDER BY (epoch ASC, group_name ASC, auto_generated_uuid_field ASC)
    

    To verify my hypothesis you can run your current query in cqlsh with turning on tracing read here how to do it. So the problem has nothing in connect with Spark.

    About error and time to get it, everything is fine because you receive error after timeout happened.

    Also I remember recommendations of spark-cassandra-connector to place Spark slaves joint to Cassandra nodes exactly to distribute queries by partitioning key.