I'm on Spark 2.3 cluster of 5 nodes, each with 12Gb of available memory, and am trying to work with Parquet dataset of approx 130Gb, on top of which I created a partitioned external Hive table.
Let's say I would like to know the number of records in the dataset. My guess initially was that Spark would read the data partition by partition, aggregate a current partition to get the count of records, pass the result to the driver and then drop that partition in order to read the next one. However, either that's not how it works (rather, Spark tries to read everything in memory first), or I'm coding it wrong.
The dumb approach like sql("select count(*) from myhivetable")
doesn't work: the job fails with a java heap space error. Neither does sql("select * from myhivetable").count()
(I guess they are compiled to the same execution plan anyway).
I could potentially forget about the hive table, use filesystem API to get the list of files comprising that table and do the records counting file by file, summing up the count into the result, like, fileList.foldLeft{ (recCount, file) => file.read(parquet).count + recCount}
-- however, a) this method may not "scale" to other possilbe use cases, b) I'd still guess there should be a more elegant way to do it just using Spark toolset. Am I failing to see it?
Assuming you have 8 cores per node, Can you try using these parameters with spark-submit
or spark-shell
:
Total memory - 5 * 12GB = 60GB
Total Cores - 5 * 8 = 40
--driver-cores 1
--driver-memory 2G
--num-executors 9
--executor-cores 4
--executor-memory 6G
If this does not work, can you change the numbers and try again ? and please post error log, Spark UI screen shots.
I am not sure whether it makes any difference to use SELECT COUNT(1) FROM table
instead of SELECT COUNT(*) FROM table
!!