I have a large data set of 1B records and want to run analytics using Apache spark because of the scaling it provides, but I am seeing an anti pattern here. The more nodes I add to spark cluster, completion time increases. Data store is Cassandra, and queries are run by Zeppelin. I have tried many different queries but even a simple query of dataframe.count()
behaves like this.
Here is the zeppelin notebook, temp table has 18M records
val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "temp", "keyspace" -> "mykeyspace"))
.load().cache()
df.registerTempTable("table")
%sql
SELECT first(devid),date,count(1) FROM table group by date,rtu order by date
when tested against different no. of spark worker nodes these were the results
+-------------+---------------+
| Spark Nodes | Time |
+-------------+---------------+
| 1 node | 17 min 59 sec |
| 2 nodes | 12 min 51 sec |
| 3 nodes | 15 min 49 sec |
| 4 nodes | 22 min 58 sec |
+-------------+---------------+
Increasing the no. of nodes decreases performance. which should not happen as it defeats the purpose of using Spark.
if you want me to run any query or further info about the setup please ask. Any cues on why this is happening would be very helpful, been stuck on this for two days now. Thank you for your time.
versions
Zeppelin: 0.7.1, Spark: 2.1.0, Cassandra: 2.2.9, Connector: datastax:spark-cassandra-connector:2.0.1-s_2.11
Spark cluster specs
6 vCPUs, 32 GB memory = 1 node
Cassandra + Zeppelin server specs
8 vCPUs, 52 GB memory
One thing to consider is that at a certain point you may be overwhelming the Cassandra Cluster with requests. Without also scaling the Cassandra side of the equation you could easily see diminishing returns as C* ends up spending too much time rejecting requests.
This is basically the Man-month fallacy. Just because you can throw more workers at a problem does not necessarily mean the project can be done faster.
It would be extremely beneficial for you to benchmark the different parts of your query separately. Currently as you have it setup the entire set of data is cache on the read which adds additional slowness if you are benchmarking a single request.
You should benchmark in isolation
Then you can figure out where your bottlenecks are and Scale appropriately.