I know that partitioning and bucketing are used for avoiding data shuffle.
Also bucketing solves problem of creating many directories on partitioning.
and
DataFrame's repartition method can partition at(in) memory.
Except that partitioning and bucketing are physically stored, and DataFrame's repartition method can partition an(in) memory,
Is both partitioning and bucketing and DataFrame's repartition method the same way to partition?
For example:
dataFrame.repartition(col("colName"))
and
dataFrame.write...partitionBy("colName")...
same?
dataFrame.repartition(10, col("colName"))
and
dataFrame.write...bucketBy(10, "colName")...
same?
In order to understand this you just need to know that the partitionBy
method does not trigger any shuffle. If a task is processing events for X days the partitionBy
method will result in writing X files in HDFS
Let’s do a simple simple scenarios check. df
is dataframe with column eventTimestamp
and we are writing dataframe back to HDFS by adding three columns using partitonBy
:
df.withColumn("year", year(col("eventTimestamp")))
.withColumn("month", month(col("eventTimestamp")))
.withColumn("day", dayofmonth(col("eventTimestamp")))
.repartition(col("year"), col("month"), col("day"))
.write
.partitionBy("year", "month", "day")
.save(output)
Scenario 1: Input data : Events stored in 200 blocks in HDFS. Each block is 128M. The events are just for the past week.
Output: each task will produce 7 files in HDFS (1 per day) which leads to
7×200=1400 files produced by the job in total.
Scenario 2: Input data : Events stored in 200 blocks in HDFS. Each block is 128M. The events are for the past 365 days (so the same amount of data but the events are from the last year, not just one week)
Output each task will produce 365 files (again – 1 per day – that’s how partitionBy works).
This leads to 365×200=73000 files. 73 thousand!
This will hurt you! The reasons are:
HDFS is optimized for large files and batch processing rather than handling many tiny files Metadata for these files will take a lot of space in NameNode memory. With slightly bigger numbers you could even kill your cluster!
How would you fix the excess number of files? In some scenarios it’s very easy – all you need to do is repartition your DataSet:
The repartition call will cause Spark to shuffle the data
Shuffle mechanism uses hashing to decide which bucket a specific record will go to. The rule is that one day’s data will always belong to the same bucket. So tasks in stage 2 will pull all buckets number X to the same place and merge them together. That means that all day data will be processed in the same task. The end result is that you will end up with a number of files equal to the number of days in your dataset (remember – you use partitionBy
method and you pass a day as a parameter).
Please refer below link for more details:
http://tantusdata.com/spark-shuffle-case-1-partition-by-and-repartition/